diff --git a/src/__tests__/engine-happy-path.test.ts b/src/__tests__/engine-happy-path.test.ts index 2a201ae..3519ccf 100644 --- a/src/__tests__/engine-happy-path.test.ts +++ b/src/__tests__/engine-happy-path.test.ts @@ -384,7 +384,90 @@ describe('WorkflowEngine Integration: Happy Path', () => { }); // ===================================================== - // 7. Config validation + // 7. Phase events + // ===================================================== + describe('Phase events', () => { + it('should emit phase:start and phase:complete events for Phase 1', async () => { + const simpleConfig: WorkflowConfig = { + name: 'test', + maxIterations: 10, + initialStep: 'plan', + steps: [ + makeStep('plan', { + rules: [makeRule('done', 'COMPLETE')], + }), + ], + }; + const engine = new WorkflowEngine(simpleConfig, tmpDir, 'test task', { projectCwd: tmpDir }); + + mockRunAgentSequence([ + makeResponse({ agent: 'plan', content: 'Plan done' }), + ]); + mockDetectMatchedRuleSequence([ + { index: 0, method: 'phase1_tag' }, + ]); + + const phaseStartFn = vi.fn(); + const phaseCompleteFn = vi.fn(); + engine.on('phase:start', phaseStartFn); + engine.on('phase:complete', phaseCompleteFn); + + await engine.run(); + + expect(phaseStartFn).toHaveBeenCalledWith( + expect.objectContaining({ name: 'plan' }), + 1, 'execute', expect.any(String) + ); + expect(phaseCompleteFn).toHaveBeenCalledWith( + expect.objectContaining({ name: 'plan' }), + 1, 'execute', expect.any(String), 'done', undefined + ); + }); + + it('should emit phase events for all steps in happy path', async () => { + const config = buildDefaultWorkflowConfig(); + const engine = new WorkflowEngine(config, tmpDir, 'test task', { projectCwd: tmpDir }); + + mockRunAgentSequence([ + makeResponse({ agent: 'plan', content: 'Plan' }), + makeResponse({ agent: 'implement', content: 'Impl' }), + makeResponse({ agent: 'ai_review', content: 'OK' }), + makeResponse({ agent: 'arch-review', content: 'OK' }), + makeResponse({ agent: 'security-review', content: 'OK' }), + makeResponse({ agent: 'supervise', content: 'Pass' }), + ]); + + mockDetectMatchedRuleSequence([ + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'aggregate' }, + { index: 0, method: 'phase1_tag' }, + ]); + + const phaseStartFn = vi.fn(); + const phaseCompleteFn = vi.fn(); + engine.on('phase:start', phaseStartFn); + engine.on('phase:complete', phaseCompleteFn); + + await engine.run(); + + // 4 normal steps + 2 parallel sub-steps = 6 Phase 1 invocations + expect(phaseStartFn).toHaveBeenCalledTimes(6); + expect(phaseCompleteFn).toHaveBeenCalledTimes(6); + + // All calls should be Phase 1 (execute) since report/judgment are mocked off + for (const call of phaseStartFn.mock.calls) { + expect(call[1]).toBe(1); + expect(call[2]).toBe('execute'); + } + }); + }); + + // ===================================================== + // 8. Config validation // ===================================================== describe('Config validation', () => { it('should throw when initial step does not exist', () => { diff --git a/src/__tests__/session.test.ts b/src/__tests__/session.test.ts index 183087e..100ebb5 100644 --- a/src/__tests__/session.test.ts +++ b/src/__tests__/session.test.ts @@ -19,6 +19,10 @@ import { type NdjsonStepComplete, type NdjsonWorkflowComplete, type NdjsonWorkflowAbort, + type NdjsonPhaseStart, + type NdjsonPhaseComplete, + type NdjsonInteractiveStart, + type NdjsonInteractiveEnd, } from '../infra/fs/session.js'; /** Create a temp project directory with .takt/logs structure */ @@ -445,4 +449,193 @@ describe('NDJSON log', () => { } }); }); + + describe('phase NDJSON records', () => { + it('should serialize and append phase_start records', () => { + const filepath = initNdjsonLog('sess-phase-001', 'task', 'wf', projectDir); + + const record: NdjsonPhaseStart = { + type: 'phase_start', + step: 'plan', + phase: 1, + phaseName: 'execute', + timestamp: '2025-01-01T00:00:01.000Z', + instruction: 'Do the planning', + }; + appendNdjsonLine(filepath, record); + + const content = readFileSync(filepath, 'utf-8'); + const lines = content.trim().split('\n'); + expect(lines).toHaveLength(2); // workflow_start + phase_start + + const parsed = JSON.parse(lines[1]!) as NdjsonRecord; + expect(parsed.type).toBe('phase_start'); + if (parsed.type === 'phase_start') { + expect(parsed.step).toBe('plan'); + expect(parsed.phase).toBe(1); + expect(parsed.phaseName).toBe('execute'); + expect(parsed.instruction).toBe('Do the planning'); + } + }); + + it('should serialize and append phase_complete records', () => { + const filepath = initNdjsonLog('sess-phase-002', 'task', 'wf', projectDir); + + const record: NdjsonPhaseComplete = { + type: 'phase_complete', + step: 'plan', + phase: 2, + phaseName: 'report', + status: 'done', + content: 'Report output', + timestamp: '2025-01-01T00:00:02.000Z', + }; + appendNdjsonLine(filepath, record); + + const content = readFileSync(filepath, 'utf-8'); + const lines = content.trim().split('\n'); + expect(lines).toHaveLength(2); + + const parsed = JSON.parse(lines[1]!) as NdjsonRecord; + expect(parsed.type).toBe('phase_complete'); + if (parsed.type === 'phase_complete') { + expect(parsed.step).toBe('plan'); + expect(parsed.phase).toBe(2); + expect(parsed.phaseName).toBe('report'); + expect(parsed.status).toBe('done'); + expect(parsed.content).toBe('Report output'); + } + }); + + it('should serialize phase_complete with error', () => { + const filepath = initNdjsonLog('sess-phase-003', 'task', 'wf', projectDir); + + const record: NdjsonPhaseComplete = { + type: 'phase_complete', + step: 'impl', + phase: 3, + phaseName: 'judge', + status: 'error', + timestamp: '2025-01-01T00:00:03.000Z', + error: 'Status judgment phase failed', + }; + appendNdjsonLine(filepath, record); + + const content = readFileSync(filepath, 'utf-8'); + const lines = content.trim().split('\n'); + const parsed = JSON.parse(lines[1]!) as NdjsonRecord; + expect(parsed.type).toBe('phase_complete'); + if (parsed.type === 'phase_complete') { + expect(parsed.error).toBe('Status judgment phase failed'); + expect(parsed.phase).toBe(3); + expect(parsed.phaseName).toBe('judge'); + } + }); + + it('should be skipped by loadNdjsonLog (default case)', () => { + const filepath = initNdjsonLog('sess-phase-004', 'task', 'wf', projectDir); + + // Add phase records + appendNdjsonLine(filepath, { + type: 'phase_start', + step: 'plan', + phase: 1, + phaseName: 'execute', + timestamp: '2025-01-01T00:00:01.000Z', + instruction: 'Plan it', + } satisfies NdjsonPhaseStart); + + appendNdjsonLine(filepath, { + type: 'phase_complete', + step: 'plan', + phase: 1, + phaseName: 'execute', + status: 'done', + content: 'Planned', + timestamp: '2025-01-01T00:00:02.000Z', + } satisfies NdjsonPhaseComplete); + + // Add a step_complete so we can verify history + appendNdjsonLine(filepath, { + type: 'step_complete', + step: 'plan', + agent: 'planner', + status: 'done', + content: 'Plan completed', + instruction: 'Plan it', + timestamp: '2025-01-01T00:00:03.000Z', + } satisfies NdjsonStepComplete); + + const log = loadNdjsonLog(filepath); + expect(log).not.toBeNull(); + // Only step_complete should contribute to history + expect(log!.history).toHaveLength(1); + expect(log!.iterations).toBe(1); + }); + }); + + describe('interactive NDJSON records', () => { + it('should serialize and append interactive_start records', () => { + const filepath = initNdjsonLog('sess-interactive-001', 'task', 'wf', projectDir); + + const record: NdjsonInteractiveStart = { + type: 'interactive_start', + timestamp: '2025-01-01T00:00:01.000Z', + }; + appendNdjsonLine(filepath, record); + + const content = readFileSync(filepath, 'utf-8'); + const lines = content.trim().split('\n'); + expect(lines).toHaveLength(2); + + const parsed = JSON.parse(lines[1]!) as NdjsonRecord; + expect(parsed.type).toBe('interactive_start'); + if (parsed.type === 'interactive_start') { + expect(parsed.timestamp).toBe('2025-01-01T00:00:01.000Z'); + } + }); + + it('should serialize and append interactive_end records', () => { + const filepath = initNdjsonLog('sess-interactive-002', 'task', 'wf', projectDir); + + const record: NdjsonInteractiveEnd = { + type: 'interactive_end', + confirmed: true, + task: 'Build a feature', + timestamp: '2025-01-01T00:00:02.000Z', + }; + appendNdjsonLine(filepath, record); + + const content = readFileSync(filepath, 'utf-8'); + const lines = content.trim().split('\n'); + expect(lines).toHaveLength(2); + + const parsed = JSON.parse(lines[1]!) as NdjsonRecord; + expect(parsed.type).toBe('interactive_end'); + if (parsed.type === 'interactive_end') { + expect(parsed.confirmed).toBe(true); + expect(parsed.task).toBe('Build a feature'); + } + }); + + it('should be skipped by loadNdjsonLog (default case)', () => { + const filepath = initNdjsonLog('sess-interactive-003', 'task', 'wf', projectDir); + + appendNdjsonLine(filepath, { + type: 'interactive_start', + timestamp: '2025-01-01T00:00:01.000Z', + } satisfies NdjsonInteractiveStart); + + appendNdjsonLine(filepath, { + type: 'interactive_end', + confirmed: true, + task: 'Some task', + timestamp: '2025-01-01T00:00:02.000Z', + } satisfies NdjsonInteractiveEnd); + + const log = loadNdjsonLog(filepath); + expect(log).not.toBeNull(); + expect(log!.history).toHaveLength(0); + }); + }); }); diff --git a/src/app/cli/routing.ts b/src/app/cli/routing.ts index 29bdd2c..957fe16 100644 --- a/src/app/cli/routing.ts +++ b/src/app/cli/routing.ts @@ -104,5 +104,6 @@ program selectOptions.interactiveUserInput = true; selectOptions.workflow = workflowId; + selectOptions.interactiveMetadata = { confirmed: result.confirmed, task: result.task }; await selectAndExecuteTask(resolvedCwd, result.task, selectOptions, agentOverrides); }); diff --git a/src/core/workflow/engine/OptionsBuilder.ts b/src/core/workflow/engine/OptionsBuilder.ts index 49faa10..0269693 100644 --- a/src/core/workflow/engine/OptionsBuilder.ts +++ b/src/core/workflow/engine/OptionsBuilder.ts @@ -9,7 +9,7 @@ import { join } from 'node:path'; import type { WorkflowStep, WorkflowState, Language } from '../../models/types.js'; import type { RunAgentOptions } from '../../../agents/runner.js'; import type { PhaseRunnerContext } from '../phase-runner.js'; -import type { WorkflowEngineOptions } from '../types.js'; +import type { WorkflowEngineOptions, PhaseName } from '../types.js'; export class OptionsBuilder { constructor( @@ -75,6 +75,8 @@ export class OptionsBuilder { buildPhaseRunnerContext( state: WorkflowState, updateAgentSession: (agent: string, sessionId: string | undefined) => void, + onPhaseStart?: (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void, + onPhaseComplete?: (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void, ): PhaseRunnerContext { return { cwd: this.getCwd(), @@ -84,6 +86,8 @@ export class OptionsBuilder { getSessionId: (agent: string) => state.agentSessions.get(agent), buildResumeOptions: this.buildResumeOptions.bind(this), updateAgentSession, + onPhaseStart, + onPhaseComplete, }; } } diff --git a/src/core/workflow/engine/ParallelRunner.ts b/src/core/workflow/engine/ParallelRunner.ts index 0102091..29d9e53 100644 --- a/src/core/workflow/engine/ParallelRunner.ts +++ b/src/core/workflow/engine/ParallelRunner.ts @@ -18,7 +18,7 @@ import { incrementStepIteration } from './state-manager.js'; import { createLogger } from '../../../shared/utils/index.js'; import type { OptionsBuilder } from './OptionsBuilder.js'; import type { StepExecutor } from './StepExecutor.js'; -import type { WorkflowEngineOptions } from '../types.js'; +import type { WorkflowEngineOptions, PhaseName } from '../types.js'; const log = createLogger('parallel-runner'); @@ -35,6 +35,8 @@ export interface ParallelRunnerDeps { conditions: Array<{ index: number; text: string }>, options: { cwd: string } ) => Promise; + readonly onPhaseStart?: (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void; + readonly onPhaseComplete?: (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void; } export class ParallelRunner { @@ -69,7 +71,7 @@ export class ParallelRunner { }) : undefined; - const phaseCtx = this.deps.optionsBuilder.buildPhaseRunnerContext(state, updateAgentSession); + const phaseCtx = this.deps.optionsBuilder.buildPhaseRunnerContext(state, updateAgentSession, this.deps.onPhaseStart, this.deps.onPhaseComplete); const ruleCtx = { state, cwd: this.deps.getCwd(), @@ -93,8 +95,10 @@ export class ParallelRunner { : baseOptions; const subSessionKey = subStep.agent ?? subStep.name; + this.deps.onPhaseStart?.(subStep, 1, 'execute', subInstruction); const subResponse = await runAgent(subStep.agent, subInstruction, agentOptions); updateAgentSession(subSessionKey, subResponse.sessionId); + this.deps.onPhaseComplete?.(subStep, 1, 'execute', subResponse.content, subResponse.status, subResponse.error); // Phase 2: report output for sub-step if (subStep.report) { diff --git a/src/core/workflow/engine/StepExecutor.ts b/src/core/workflow/engine/StepExecutor.ts index 2073032..caa5110 100644 --- a/src/core/workflow/engine/StepExecutor.ts +++ b/src/core/workflow/engine/StepExecutor.ts @@ -14,6 +14,7 @@ import type { AgentResponse, Language, } from '../../models/types.js'; +import type { PhaseName } from '../types.js'; import { runAgent } from '../../../agents/runner.js'; import { InstructionBuilder, isReportObjectConfig } from '../instruction/InstructionBuilder.js'; import { needsStatusJudgmentPhase, runReportPhase, runStatusJudgmentPhase } from '../phase-runner.js'; @@ -38,6 +39,8 @@ export interface StepExecutorDeps { conditions: Array<{ index: number; text: string }>, options: { cwd: string } ) => Promise; + readonly onPhaseStart?: (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void; + readonly onPhaseComplete?: (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void; } export class StepExecutor { @@ -99,11 +102,13 @@ export class StepExecutor { }); // Phase 1: main execution (Write excluded if step has report) + this.deps.onPhaseStart?.(step, 1, 'execute', instruction); const agentOptions = this.deps.optionsBuilder.buildAgentOptions(step); let response = await runAgent(step.agent, instruction, agentOptions); updateAgentSession(sessionKey, response.sessionId); + this.deps.onPhaseComplete?.(step, 1, 'execute', response.content, response.status, response.error); - const phaseCtx = this.deps.optionsBuilder.buildPhaseRunnerContext(state, updateAgentSession); + const phaseCtx = this.deps.optionsBuilder.buildPhaseRunnerContext(state, updateAgentSession, this.deps.onPhaseStart, this.deps.onPhaseComplete); // Phase 2: report output (resume same session, Write only) if (step.report) { diff --git a/src/core/workflow/engine/WorkflowEngine.ts b/src/core/workflow/engine/WorkflowEngine.ts index 7e235c2..e47220c 100644 --- a/src/core/workflow/engine/WorkflowEngine.ts +++ b/src/core/workflow/engine/WorkflowEngine.ts @@ -104,6 +104,12 @@ export class WorkflowEngine extends EventEmitter { getWorkflowSteps: () => this.config.steps.map(s => ({ name: s.name, description: s.description })), detectRuleIndex: this.detectRuleIndex, callAiJudge: this.callAiJudge, + onPhaseStart: (step, phase, phaseName, instruction) => { + this.emit('phase:start', step, phase, phaseName, instruction); + }, + onPhaseComplete: (step, phase, phaseName, content, phaseStatus, error) => { + this.emit('phase:complete', step, phase, phaseName, content, phaseStatus, error); + }, }); this.parallelRunner = new ParallelRunner({ @@ -115,6 +121,12 @@ export class WorkflowEngine extends EventEmitter { getInteractive: () => this.options.interactive === true, detectRuleIndex: this.detectRuleIndex, callAiJudge: this.callAiJudge, + onPhaseStart: (step, phase, phaseName, instruction) => { + this.emit('phase:start', step, phase, phaseName, instruction); + }, + onPhaseComplete: (step, phase, phaseName, content, phaseStatus, error) => { + this.emit('phase:complete', step, phase, phaseName, content, phaseStatus, error); + }, }); log.debug('WorkflowEngine initialized', { diff --git a/src/core/workflow/index.ts b/src/core/workflow/index.ts index b1fa0ad..b895d68 100644 --- a/src/core/workflow/index.ts +++ b/src/core/workflow/index.ts @@ -14,6 +14,7 @@ export { COMPLETE_STEP, ABORT_STEP, ERROR_MESSAGES } from './constants.js'; // Types export type { WorkflowEvents, + PhaseName, UserInputRequest, IterationLimitRequest, SessionUpdateCallback, diff --git a/src/core/workflow/phase-runner.ts b/src/core/workflow/phase-runner.ts index 89ea741..21063fd 100644 --- a/src/core/workflow/phase-runner.ts +++ b/src/core/workflow/phase-runner.ts @@ -8,6 +8,7 @@ import { appendFileSync, existsSync, mkdirSync, writeFileSync } from 'node:fs'; import { dirname, resolve, sep } from 'node:path'; import type { WorkflowStep, Language } from '../models/types.js'; +import type { PhaseName } from './types.js'; import { runAgent, type RunAgentOptions } from '../../agents/runner.js'; import { ReportInstructionBuilder } from './instruction/ReportInstructionBuilder.js'; import { StatusJudgmentBuilder } from './instruction/StatusJudgmentBuilder.js'; @@ -32,6 +33,10 @@ export interface PhaseRunnerContext { buildResumeOptions: (step: WorkflowStep, sessionId: string, overrides: Pick) => RunAgentOptions; /** Update agent session after a phase run */ updateAgentSession: (agent: string, sessionId: string | undefined) => void; + /** Callback for phase lifecycle logging */ + onPhaseStart?: (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void; + /** Callback for phase completion logging */ + onPhaseComplete?: (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void; } /** @@ -145,16 +150,26 @@ export async function runReportPhase( language: ctx.language, }).build(); + ctx.onPhaseStart?.(step, 2, 'report', reportInstruction); + const reportOptions = ctx.buildResumeOptions(step, sessionId, { allowedTools: [], maxTurns: 3, }); - const reportResponse = await runAgent(step.agent, reportInstruction, reportOptions); + let reportResponse; + try { + reportResponse = await runAgent(step.agent, reportInstruction, reportOptions); + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + ctx.onPhaseComplete?.(step, 2, 'report', '', 'error', errorMsg); + throw error; + } // Check for errors in report phase if (reportResponse.status !== 'done') { const errorMsg = reportResponse.error || reportResponse.content || 'Unknown error'; + ctx.onPhaseComplete?.(step, 2, 'report', reportResponse.content, reportResponse.status, errorMsg); throw new Error(`Report phase failed: ${errorMsg}`); } @@ -166,6 +181,7 @@ export async function runReportPhase( // Update session (phase 2 may update it) ctx.updateAgentSession(sessionKey, reportResponse.sessionId); + ctx.onPhaseComplete?.(step, 2, 'report', reportResponse.content, reportResponse.status); log.debug('Report phase complete', { step: step.name, status: reportResponse.status }); } @@ -191,22 +207,33 @@ export async function runStatusJudgmentPhase( interactive: ctx.interactive, }).build(); + ctx.onPhaseStart?.(step, 3, 'judge', judgmentInstruction); + const judgmentOptions = ctx.buildResumeOptions(step, sessionId, { allowedTools: [], maxTurns: 3, }); - const judgmentResponse = await runAgent(step.agent, judgmentInstruction, judgmentOptions); + let judgmentResponse; + try { + judgmentResponse = await runAgent(step.agent, judgmentInstruction, judgmentOptions); + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + ctx.onPhaseComplete?.(step, 3, 'judge', '', 'error', errorMsg); + throw error; + } // Check for errors in status judgment phase if (judgmentResponse.status !== 'done') { const errorMsg = judgmentResponse.error || judgmentResponse.content || 'Unknown error'; + ctx.onPhaseComplete?.(step, 3, 'judge', judgmentResponse.content, judgmentResponse.status, errorMsg); throw new Error(`Status judgment phase failed: ${errorMsg}`); } // Update session (phase 3 may update it) ctx.updateAgentSession(sessionKey, judgmentResponse.sessionId); + ctx.onPhaseComplete?.(step, 3, 'judge', judgmentResponse.content, judgmentResponse.status); log.debug('Status judgment phase complete', { step: step.name, status: judgmentResponse.status }); return judgmentResponse.content; } diff --git a/src/core/workflow/types.ts b/src/core/workflow/types.ts index 081f5e4..f59d2f8 100644 --- a/src/core/workflow/types.ts +++ b/src/core/workflow/types.ts @@ -104,6 +104,8 @@ export type AiJudgeCaller = ( options: { cwd: string } ) => Promise; +export type PhaseName = 'execute' | 'report' | 'judge'; + /** Events emitted by workflow engine */ export interface WorkflowEvents { 'step:start': (step: WorkflowStep, iteration: number, instruction: string) => void; @@ -111,6 +113,8 @@ export interface WorkflowEvents { 'step:report': (step: WorkflowStep, filePath: string, fileName: string) => void; 'step:blocked': (step: WorkflowStep, response: AgentResponse) => void; 'step:user_input': (step: WorkflowStep, userInput: string) => void; + 'phase:start': (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void; + 'phase:complete': (step: WorkflowStep, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void; 'workflow:complete': (state: WorkflowState) => void; 'workflow:abort': (state: WorkflowState, reason: string) => void; 'iteration:limit': (iteration: number, maxIterations: number) => void; diff --git a/src/features/tasks/execute/selectAndExecute.ts b/src/features/tasks/execute/selectAndExecute.ts index b0cf04a..274dc61 100644 --- a/src/features/tasks/execute/selectAndExecute.ts +++ b/src/features/tasks/execute/selectAndExecute.ts @@ -153,6 +153,7 @@ export async function selectAndExecuteTask( projectCwd: cwd, agentOverrides, interactiveUserInput: options?.interactiveUserInput === true, + interactiveMetadata: options?.interactiveMetadata, }); if (taskSuccess && isWorktree) { diff --git a/src/features/tasks/execute/taskExecution.ts b/src/features/tasks/execute/taskExecution.ts index 6a02ebc..f782f3a 100644 --- a/src/features/tasks/execute/taskExecution.ts +++ b/src/features/tasks/execute/taskExecution.ts @@ -25,7 +25,7 @@ const log = createLogger('task'); * Execute a single task with workflow. */ export async function executeTask(options: ExecuteTaskOptions): Promise { - const { task, cwd, workflowIdentifier, projectCwd, agentOverrides, interactiveUserInput } = options; + const { task, cwd, workflowIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata } = options; const workflowConfig = loadWorkflowByIdentifier(workflowIdentifier, projectCwd); if (!workflowConfig) { @@ -51,6 +51,7 @@ export async function executeTask(options: ExecuteTaskOptions): Promise provider: agentOverrides?.provider, model: agentOverrides?.model, interactiveUserInput, + interactiveMetadata, }); return result.success; } diff --git a/src/features/tasks/execute/types.ts b/src/features/tasks/execute/types.ts index 5b3b782..d2c4f98 100644 --- a/src/features/tasks/execute/types.ts +++ b/src/features/tasks/execute/types.ts @@ -11,6 +11,14 @@ export interface WorkflowExecutionResult { reason?: string; } +/** Metadata from interactive mode, passed through to NDJSON logging */ +export interface InteractiveMetadata { + /** Whether the user confirmed with /go */ + confirmed: boolean; + /** The assembled task text (only meaningful when confirmed=true) */ + task?: string; +} + /** Options for workflow execution */ export interface WorkflowExecutionOptions { /** Header prefix for display */ @@ -23,6 +31,8 @@ export interface WorkflowExecutionOptions { model?: string; /** Enable interactive user input during step transitions */ interactiveUserInput?: boolean; + /** Interactive mode result metadata for NDJSON logging */ + interactiveMetadata?: InteractiveMetadata; } export interface TaskExecutionOptions { @@ -43,6 +53,8 @@ export interface ExecuteTaskOptions { agentOverrides?: TaskExecutionOptions; /** Enable interactive user input during step transitions */ interactiveUserInput?: boolean; + /** Interactive mode result metadata for NDJSON logging */ + interactiveMetadata?: InteractiveMetadata; } export interface PipelineExecutionOptions { @@ -79,4 +91,6 @@ export interface SelectAndExecuteOptions { createWorktree?: boolean | undefined; /** Enable interactive user input during step transitions */ interactiveUserInput?: boolean; + /** Interactive mode result metadata for NDJSON logging */ + interactiveMetadata?: InteractiveMetadata; } diff --git a/src/features/tasks/execute/workflowExecution.ts b/src/features/tasks/execute/workflowExecution.ts index a772ca8..a9deeea 100644 --- a/src/features/tasks/execute/workflowExecution.ts +++ b/src/features/tasks/execute/workflowExecution.ts @@ -39,6 +39,10 @@ import { type NdjsonStepComplete, type NdjsonWorkflowComplete, type NdjsonWorkflowAbort, + type NdjsonPhaseStart, + type NdjsonPhaseComplete, + type NdjsonInteractiveStart, + type NdjsonInteractiveEnd, } from '../../../infra/fs/index.js'; import { createLogger, notifySuccess, notifyError } from '../../../shared/utils/index.js'; import { selectOption, promptInput } from '../../../shared/prompt/index.js'; @@ -94,6 +98,23 @@ export async function executeWorkflow( const ndjsonLogPath = initNdjsonLog(workflowSessionId, task, workflowConfig.name, projectCwd); updateLatestPointer(sessionLog, workflowSessionId, projectCwd, { copyToPrevious: true }); + // Write interactive mode records if interactive mode was used before this workflow + if (options.interactiveMetadata) { + const startRecord: NdjsonInteractiveStart = { + type: 'interactive_start', + timestamp: new Date().toISOString(), + }; + appendNdjsonLine(ndjsonLogPath, startRecord); + + const endRecord: NdjsonInteractiveEnd = { + type: 'interactive_end', + confirmed: options.interactiveMetadata.confirmed, + ...(options.interactiveMetadata.task ? { task: options.interactiveMetadata.task } : {}), + timestamp: new Date().toISOString(), + }; + appendNdjsonLine(ndjsonLogPath, endRecord); + } + // Track current display for streaming const displayRef: { current: StreamDisplay | null } = { current: null }; @@ -199,6 +220,34 @@ export async function executeWorkflow( let abortReason: string | undefined; + engine.on('phase:start', (step, phase, phaseName, instruction) => { + log.debug('Phase starting', { step: step.name, phase, phaseName }); + const record: NdjsonPhaseStart = { + type: 'phase_start', + step: step.name, + phase, + phaseName, + timestamp: new Date().toISOString(), + ...(instruction ? { instruction } : {}), + }; + appendNdjsonLine(ndjsonLogPath, record); + }); + + engine.on('phase:complete', (step, phase, phaseName, content, phaseStatus, phaseError) => { + log.debug('Phase completed', { step: step.name, phase, phaseName, status: phaseStatus }); + const record: NdjsonPhaseComplete = { + type: 'phase_complete', + step: step.name, + phase, + phaseName, + status: phaseStatus, + ...(content ? { content } : {}), + timestamp: new Date().toISOString(), + ...(phaseError ? { error: phaseError } : {}), + }; + appendNdjsonLine(ndjsonLogPath, record); + }); + engine.on('step:start', (step, iteration, instruction) => { log.debug('Step starting', { step: step.name, agent: step.agentDisplayName, iteration }); info(`[${iteration}/${workflowConfig.maxIterations}] ${step.name} (${step.agentDisplayName})`); diff --git a/src/infra/fs/index.ts b/src/infra/fs/index.ts index 933a7b9..f796155 100644 --- a/src/infra/fs/index.ts +++ b/src/infra/fs/index.ts @@ -9,6 +9,10 @@ export type { NdjsonStepComplete, NdjsonWorkflowComplete, NdjsonWorkflowAbort, + NdjsonPhaseStart, + NdjsonPhaseComplete, + NdjsonInteractiveStart, + NdjsonInteractiveEnd, NdjsonRecord, LatestLogPointer, } from './session.js'; diff --git a/src/infra/fs/session.ts b/src/infra/fs/session.ts index 4c233c7..9281104 100644 --- a/src/infra/fs/session.ts +++ b/src/infra/fs/session.ts @@ -21,6 +21,10 @@ export type { NdjsonStepComplete, NdjsonWorkflowComplete, NdjsonWorkflowAbort, + NdjsonPhaseStart, + NdjsonPhaseComplete, + NdjsonInteractiveStart, + NdjsonInteractiveEnd, NdjsonRecord, LatestLogPointer, } from '../../shared/utils/index.js'; diff --git a/src/shared/utils/types.ts b/src/shared/utils/types.ts index edf470f..8d27528 100644 --- a/src/shared/utils/types.ts +++ b/src/shared/utils/types.ts @@ -73,12 +73,48 @@ export interface NdjsonWorkflowAbort { endTime: string; } +export interface NdjsonPhaseStart { + type: 'phase_start'; + step: string; + phase: 1 | 2 | 3; + phaseName: 'execute' | 'report' | 'judge'; + timestamp: string; + instruction?: string; +} + +export interface NdjsonPhaseComplete { + type: 'phase_complete'; + step: string; + phase: 1 | 2 | 3; + phaseName: 'execute' | 'report' | 'judge'; + status: string; + content?: string; + timestamp: string; + error?: string; +} + +export interface NdjsonInteractiveStart { + type: 'interactive_start'; + timestamp: string; +} + +export interface NdjsonInteractiveEnd { + type: 'interactive_end'; + confirmed: boolean; + task?: string; + timestamp: string; +} + export type NdjsonRecord = | NdjsonWorkflowStart | NdjsonStepStart | NdjsonStepComplete | NdjsonWorkflowComplete - | NdjsonWorkflowAbort; + | NdjsonWorkflowAbort + | NdjsonPhaseStart + | NdjsonPhaseComplete + | NdjsonInteractiveStart + | NdjsonInteractiveEnd; // --- Conversation log types ---