diff --git a/src/__tests__/engine-blocked.test.ts b/src/__tests__/engine-blocked.test.ts index 28c7ace..8cf10e6 100644 --- a/src/__tests__/engine-blocked.test.ts +++ b/src/__tests__/engine-blocked.test.ts @@ -141,4 +141,28 @@ describe('PieceEngine Integration: Blocked Handling', () => { expect(userInputFn).toHaveBeenCalledOnce(); expect(state.userInputs).toContain('User provided clarification'); }); + + it('should abort immediately when movement returns error status', async () => { + const config = buildDefaultPieceConfig(); + const onUserInput = vi.fn().mockResolvedValueOnce('should not be called'); + const engine = new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir, onUserInput }); + + mockRunAgentSequence([ + makeResponse({ persona: 'plan', status: 'error', content: 'Transport error', error: 'Transport error' }), + ]); + + mockDetectMatchedRuleSequence([ + { index: 0, method: 'phase1_tag' }, + ]); + + const abortFn = vi.fn(); + engine.on('piece:abort', abortFn); + + const state = await engine.run(); + + expect(state.status).toBe('aborted'); + expect(onUserInput).not.toHaveBeenCalled(); + expect(abortFn).toHaveBeenCalledWith(expect.anything(), expect.stringContaining('Transport error')); + }); + }); diff --git a/src/__tests__/engine-parallel-failure.test.ts b/src/__tests__/engine-parallel-failure.test.ts index a60dc9d..3d3d00e 100644 --- a/src/__tests__/engine-parallel-failure.test.ts +++ b/src/__tests__/engine-parallel-failure.test.ts @@ -4,7 +4,7 @@ * Covers: * - One sub-movement fails while another succeeds → piece continues * - All sub-movements fail → piece aborts - * - Failed sub-movement is recorded as blocked with error + * - Failed sub-movement is recorded as error with error message */ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; @@ -141,10 +141,10 @@ describe('PieceEngine Integration: Parallel Movement Partial Failure', () => { expect(state.status).toBe('completed'); - // arch-review should be recorded as blocked + // arch-review should be recorded as error const archReviewOutput = state.movementOutputs.get('arch-review'); expect(archReviewOutput).toBeDefined(); - expect(archReviewOutput!.status).toBe('blocked'); + expect(archReviewOutput!.status).toBe('error'); expect(archReviewOutput!.error).toContain('exit'); // security-review should be recorded as done diff --git a/src/__tests__/models.test.ts b/src/__tests__/models.test.ts index 7b9e179..db67628 100644 --- a/src/__tests__/models.test.ts +++ b/src/__tests__/models.test.ts @@ -33,6 +33,7 @@ describe('StatusSchema', () => { expect(StatusSchema.parse('approved')).toBe('approved'); expect(StatusSchema.parse('rejected')).toBe('rejected'); expect(StatusSchema.parse('blocked')).toBe('blocked'); + expect(StatusSchema.parse('error')).toBe('error'); expect(StatusSchema.parse('answer')).toBe('answer'); }); diff --git a/src/core/models/schemas.ts b/src/core/models/schemas.ts index 9b0dcbe..df0d6b8 100644 --- a/src/core/models/schemas.ts +++ b/src/core/models/schemas.ts @@ -48,6 +48,7 @@ export const StatusSchema = z.enum([ 'pending', 'done', 'blocked', + 'error', 'approved', 'rejected', 'improve', diff --git a/src/core/models/status.ts b/src/core/models/status.ts index adb0a64..fb77af9 100644 --- a/src/core/models/status.ts +++ b/src/core/models/status.ts @@ -10,6 +10,7 @@ export type Status = | 'pending' | 'done' | 'blocked' + | 'error' | 'approved' | 'rejected' | 'improve' diff --git a/src/core/piece/engine/ParallelRunner.ts b/src/core/piece/engine/ParallelRunner.ts index a6f9ba5..de0ca70 100644 --- a/src/core/piece/engine/ParallelRunner.ts +++ b/src/core/piece/engine/ParallelRunner.ts @@ -131,7 +131,7 @@ export class ParallelRunner { }), ); - // Map settled results: fulfilled → as-is, rejected → blocked AgentResponse + // Map settled results: fulfilled → as-is, rejected → error AgentResponse const subResults = settled.map((result, index) => { if (result.status === 'fulfilled') { return result.value; @@ -139,15 +139,15 @@ export class ParallelRunner { const failedMovement = subMovements[index]!; const errorMsg = getErrorMessage(result.reason); log.error('Sub-movement failed', { movement: failedMovement.name, error: errorMsg }); - const blockedResponse: AgentResponse = { + const errorResponse: AgentResponse = { persona: failedMovement.name, - status: 'blocked', + status: 'error', content: '', timestamp: new Date(), error: errorMsg, }; - state.movementOutputs.set(failedMovement.name, blockedResponse); - return { subMovement: failedMovement, response: blockedResponse, instruction: '' }; + state.movementOutputs.set(failedMovement.name, errorResponse); + return { subMovement: failedMovement, response: errorResponse, instruction: '' }; }); // If all sub-movements failed (error-originated), throw diff --git a/src/core/piece/engine/PieceEngine.ts b/src/core/piece/engine/PieceEngine.ts index d04c984..d7f5071 100644 --- a/src/core/piece/engine/PieceEngine.ts +++ b/src/core/piece/engine/PieceEngine.ts @@ -522,6 +522,13 @@ export class PieceEngine extends EventEmitter { break; } + if (response.status === 'error') { + const detail = response.error ?? response.content ?? `Movement "${movement.name}" returned error status`; + this.state.status = 'aborted'; + this.emit('piece:abort', this.state, `Movement "${movement.name}" failed: ${detail}`); + break; + } + let nextMovement = this.resolveNextMovement(movement, response); log.debug('Movement transition', { from: movement.name, diff --git a/src/features/interactive/conversationLoop.ts b/src/features/interactive/conversationLoop.ts index 45c3989..1819ceb 100644 --- a/src/features/interactive/conversationLoop.ts +++ b/src/features/interactive/conversationLoop.ts @@ -109,7 +109,7 @@ export async function callAIWithRetry( onStream: display.createHandler(), }); display.flush(); - const success = response.status !== 'blocked'; + const success = response.status !== 'blocked' && response.status !== 'error'; if (!success && sessionId) { log.info('Session invalid, retrying without session'); @@ -129,7 +129,7 @@ export async function callAIWithRetry( updatePersonaSession(cwd, ctx.personaName, sessionId, ctx.providerType); } return { - result: { content: retry.content, sessionId: retry.sessionId, success: retry.status !== 'blocked' }, + result: { content: retry.content, sessionId: retry.sessionId, success: retry.status !== 'blocked' && retry.status !== 'error' }, sessionId, }; } diff --git a/src/infra/claude/client.ts b/src/infra/claude/client.ts index 9ef9729..85fc418 100644 --- a/src/infra/claude/client.ts +++ b/src/infra/claude/client.ts @@ -29,7 +29,7 @@ export class ClaudeClient { if (result.interrupted) { return 'interrupted'; } - return 'blocked'; + return 'error'; } return 'done'; } @@ -146,7 +146,7 @@ export class ClaudeClient { return { persona: `skill:${skillName}`, - status: result.success ? 'done' : 'blocked', + status: result.success ? 'done' : 'error', content: result.content, timestamp: new Date(), sessionId: result.sessionId, diff --git a/src/infra/codex/client.ts b/src/infra/codex/client.ts index aed7248..e0d43ec 100644 --- a/src/infra/codex/client.ts +++ b/src/infra/codex/client.ts @@ -25,6 +25,18 @@ export type { CodexCallOptions } from './types.js'; const log = createLogger('codex-sdk'); const CODEX_STREAM_IDLE_TIMEOUT_MS = 10 * 60 * 1000; const CODEX_STREAM_ABORTED_MESSAGE = 'Codex execution aborted'; +const CODEX_RETRY_MAX_ATTEMPTS = 3; +const CODEX_RETRY_BASE_DELAY_MS = 250; +const CODEX_RETRYABLE_ERROR_PATTERNS = [ + 'stream disconnected before completion', + 'transport error', + 'network error', + 'error decoding response body', + 'econnreset', + 'etimedout', + 'eai_again', + 'fetch failed', +]; /** * Client for Codex SDK agent interactions. @@ -33,13 +45,49 @@ const CODEX_STREAM_ABORTED_MESSAGE = 'Codex execution aborted'; * and response processing. */ export class CodexClient { + private isRetriableError(message: string, aborted: boolean, abortCause?: 'timeout' | 'external'): boolean { + if (aborted || abortCause) { + return false; + } + + const lower = message.toLowerCase(); + return CODEX_RETRYABLE_ERROR_PATTERNS.some((pattern) => lower.includes(pattern)); + } + + private async waitForRetryDelay(attempt: number, signal?: AbortSignal): Promise { + const delayMs = CODEX_RETRY_BASE_DELAY_MS * (2 ** Math.max(0, attempt - 1)); + await new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + if (signal) { + signal.removeEventListener('abort', onAbort); + } + resolve(); + }, delayMs); + + const onAbort = (): void => { + clearTimeout(timeoutId); + if (signal) { + signal.removeEventListener('abort', onAbort); + } + reject(new Error(CODEX_STREAM_ABORTED_MESSAGE)); + }; + + if (signal) { + if (signal.aborted) { + onAbort(); + return; + } + signal.addEventListener('abort', onAbort, { once: true }); + } + }); + } + /** Call Codex with an agent prompt */ async call( agentType: string, prompt: string, options: CodexCallOptions, ): Promise { - const codex = new Codex(options.openaiApiKey ? { apiKey: options.openaiApiKey } : undefined); const sandboxMode = options.permissionMode ? mapToCodexSandboxMode(options.permissionMode) : 'workspace-write'; @@ -48,186 +96,213 @@ export class CodexClient { workingDirectory: options.cwd, sandboxMode, }; - const thread = options.sessionId - ? await codex.resumeThread(options.sessionId, threadOptions) - : await codex.startThread(threadOptions); - let threadId = extractThreadId(thread) || options.sessionId; + let threadId = options.sessionId; const fullPrompt = options.systemPrompt ? `${options.systemPrompt}\n\n${prompt}` : prompt; - let idleTimeoutId: ReturnType | undefined; - const streamAbortController = new AbortController(); - const timeoutMessage = `Codex stream timed out after ${Math.floor(CODEX_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; - let abortCause: 'timeout' | 'external' | undefined; + for (let attempt = 1; attempt <= CODEX_RETRY_MAX_ATTEMPTS; attempt++) { + const codex = new Codex(options.openaiApiKey ? { apiKey: options.openaiApiKey } : undefined); + const thread = threadId + ? await codex.resumeThread(threadId, threadOptions) + : await codex.startThread(threadOptions); + let currentThreadId = extractThreadId(thread) || threadId; - const resetIdleTimeout = (): void => { - if (idleTimeoutId !== undefined) { - clearTimeout(idleTimeoutId); - } - idleTimeoutId = setTimeout(() => { - abortCause = 'timeout'; + let idleTimeoutId: ReturnType | undefined; + const streamAbortController = new AbortController(); + const timeoutMessage = `Codex stream timed out after ${Math.floor(CODEX_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; + let abortCause: 'timeout' | 'external' | undefined; + + const resetIdleTimeout = (): void => { + if (idleTimeoutId !== undefined) { + clearTimeout(idleTimeoutId); + } + idleTimeoutId = setTimeout(() => { + abortCause = 'timeout'; + streamAbortController.abort(); + }, CODEX_STREAM_IDLE_TIMEOUT_MS); + }; + + const onExternalAbort = (): void => { + abortCause = 'external'; streamAbortController.abort(); - }, CODEX_STREAM_IDLE_TIMEOUT_MS); - }; + }; - const onExternalAbort = (): void => { - abortCause = 'external'; - streamAbortController.abort(); - }; - - if (options.abortSignal) { - if (options.abortSignal.aborted) { - streamAbortController.abort(); - } else { - options.abortSignal.addEventListener('abort', onExternalAbort, { once: true }); + if (options.abortSignal) { + if (options.abortSignal.aborted) { + streamAbortController.abort(); + } else { + options.abortSignal.addEventListener('abort', onExternalAbort, { once: true }); + } } - } - try { - log.debug('Executing Codex thread', { - agentType, - model: options.model, - hasSystemPrompt: !!options.systemPrompt, - }); + try { + log.debug('Executing Codex thread', { + agentType, + model: options.model, + hasSystemPrompt: !!options.systemPrompt, + attempt, + }); - const { events } = await thread.runStreamed(fullPrompt, { - signal: streamAbortController.signal, - }); - resetIdleTimeout(); - let content = ''; - const contentOffsets = new Map(); - let success = true; - let failureMessage = ''; - const state = createStreamTrackingState(); - - for await (const event of events as AsyncGenerator) { + const { events } = await thread.runStreamed(fullPrompt, { + signal: streamAbortController.signal, + }); resetIdleTimeout(); - if (event.type === 'thread.started') { - threadId = typeof event.thread_id === 'string' ? event.thread_id : threadId; - emitInit(options.onStream, options.model, threadId); - continue; - } - if (event.type === 'turn.failed') { - success = false; - if (event.error && typeof event.error === 'object' && 'message' in event.error) { - failureMessage = String((event.error as { message?: unknown }).message ?? ''); + let content = ''; + const contentOffsets = new Map(); + let success = true; + let failureMessage = ''; + const state = createStreamTrackingState(); + + for await (const event of events as AsyncGenerator) { + resetIdleTimeout(); + + if (event.type === 'thread.started') { + currentThreadId = typeof event.thread_id === 'string' ? event.thread_id : currentThreadId; + emitInit(options.onStream, options.model, currentThreadId); + continue; } - break; - } - if (event.type === 'error') { - success = false; - failureMessage = typeof event.message === 'string' ? event.message : 'Unknown error'; - break; - } - - if (event.type === 'item.started') { - const item = event.item as CodexItem | undefined; - if (item) { - emitCodexItemStart(item, options.onStream, state.startedItems); + if (event.type === 'turn.failed') { + success = false; + if (event.error && typeof event.error === 'object' && 'message' in event.error) { + failureMessage = String((event.error as { message?: unknown }).message ?? ''); + } + break; } - continue; - } - if (event.type === 'item.updated') { - const item = event.item as CodexItem | undefined; - if (item) { - if (item.type === 'agent_message' && typeof item.text === 'string') { - const itemId = item.id; - const text = item.text; - if (itemId) { - const prev = contentOffsets.get(itemId) ?? 0; - if (text.length > prev) { - if (prev === 0 && content.length > 0) { - content += '\n'; + if (event.type === 'error') { + success = false; + failureMessage = typeof event.message === 'string' ? event.message : 'Unknown error'; + break; + } + + if (event.type === 'item.started') { + const item = event.item as CodexItem | undefined; + if (item) { + emitCodexItemStart(item, options.onStream, state.startedItems); + } + continue; + } + + if (event.type === 'item.updated') { + const item = event.item as CodexItem | undefined; + if (item) { + if (item.type === 'agent_message' && typeof item.text === 'string') { + const itemId = item.id; + const text = item.text; + if (itemId) { + const prev = contentOffsets.get(itemId) ?? 0; + if (text.length > prev) { + if (prev === 0 && content.length > 0) { + content += '\n'; + } + content += text.slice(prev); + contentOffsets.set(itemId, text.length); } - content += text.slice(prev); - contentOffsets.set(itemId, text.length); } } + emitCodexItemUpdate(item, options.onStream, state); } - emitCodexItemUpdate(item, options.onStream, state); + continue; } - continue; - } - if (event.type === 'item.completed') { - const item = event.item as CodexItem | undefined; - if (item) { - if (item.type === 'agent_message' && typeof item.text === 'string') { - const itemId = item.id; - const text = item.text; - if (itemId) { - const prev = contentOffsets.get(itemId) ?? 0; - if (text.length > prev) { - if (prev === 0 && content.length > 0) { + if (event.type === 'item.completed') { + const item = event.item as CodexItem | undefined; + if (item) { + if (item.type === 'agent_message' && typeof item.text === 'string') { + const itemId = item.id; + const text = item.text; + if (itemId) { + const prev = contentOffsets.get(itemId) ?? 0; + if (text.length > prev) { + if (prev === 0 && content.length > 0) { + content += '\n'; + } + content += text.slice(prev); + contentOffsets.set(itemId, text.length); + } + } else if (text) { + if (content.length > 0) { content += '\n'; } - content += text.slice(prev); - contentOffsets.set(itemId, text.length); + content += text; } - } else if (text) { - if (content.length > 0) { - content += '\n'; - } - content += text; } + emitCodexItemCompleted(item, options.onStream, state); } - emitCodexItemCompleted(item, options.onStream, state); + continue; } - continue; } - } - if (!success) { - const message = failureMessage || 'Codex execution failed'; - emitResult(options.onStream, false, message, threadId); + if (!success) { + const message = failureMessage || 'Codex execution failed'; + const retriable = this.isRetriableError(message, streamAbortController.signal.aborted, abortCause); + if (retriable && attempt < CODEX_RETRY_MAX_ATTEMPTS) { + log.info('Retrying Codex call after transient failure', { agentType, attempt, message }); + threadId = currentThreadId; + await this.waitForRetryDelay(attempt, options.abortSignal); + continue; + } + + emitResult(options.onStream, false, message, currentThreadId); + return { + persona: agentType, + status: 'error', + content: message, + timestamp: new Date(), + sessionId: currentThreadId, + }; + } + + const trimmed = content.trim(); + emitResult(options.onStream, true, trimmed, currentThreadId); + return { persona: agentType, - status: 'blocked', - content: message, + status: 'done', + content: trimmed, timestamp: new Date(), - sessionId: threadId, + sessionId: currentThreadId, }; - } + } catch (error) { + const message = getErrorMessage(error); + const errorMessage = streamAbortController.signal.aborted + ? abortCause === 'timeout' + ? timeoutMessage + : CODEX_STREAM_ABORTED_MESSAGE + : message; - const trimmed = content.trim(); - emitResult(options.onStream, true, trimmed, threadId); + const retriable = this.isRetriableError(errorMessage, streamAbortController.signal.aborted, abortCause); + if (retriable && attempt < CODEX_RETRY_MAX_ATTEMPTS) { + log.info('Retrying Codex call after transient exception', { agentType, attempt, errorMessage }); + threadId = currentThreadId; + await this.waitForRetryDelay(attempt, options.abortSignal); + continue; + } - return { - persona: agentType, - status: 'done', - content: trimmed, - timestamp: new Date(), - sessionId: threadId, - }; - } catch (error) { - const message = getErrorMessage(error); - const errorMessage = streamAbortController.signal.aborted - ? abortCause === 'timeout' - ? timeoutMessage - : CODEX_STREAM_ABORTED_MESSAGE - : message; - emitResult(options.onStream, false, errorMessage, threadId); + emitResult(options.onStream, false, errorMessage, currentThreadId); - return { - persona: agentType, - status: 'blocked', - content: errorMessage, - timestamp: new Date(), - sessionId: threadId, - }; - } finally { - if (idleTimeoutId !== undefined) { - clearTimeout(idleTimeoutId); - } - if (options.abortSignal) { - options.abortSignal.removeEventListener('abort', onExternalAbort); + return { + persona: agentType, + status: 'error', + content: errorMessage, + timestamp: new Date(), + sessionId: currentThreadId, + }; + } finally { + if (idleTimeoutId !== undefined) { + clearTimeout(idleTimeoutId); + } + if (options.abortSignal) { + options.abortSignal.removeEventListener('abort', onExternalAbort); + } } } + + throw new Error('Unreachable: Codex retry loop exhausted without returning'); } /** Call Codex with a custom agent configuration (system prompt + prompt) */ diff --git a/src/infra/mock/scenario.ts b/src/infra/mock/scenario.ts index 8eb3e5c..3271e32 100644 --- a/src/infra/mock/scenario.ts +++ b/src/infra/mock/scenario.ts @@ -130,7 +130,7 @@ function validateEntry(entry: unknown, index: number): ScenarioEntry { } // status defaults to 'done' - const validStatuses = ['done', 'blocked', 'approved', 'rejected', 'improve'] as const; + const validStatuses = ['done', 'blocked', 'error', 'approved', 'rejected', 'improve'] as const; const status = obj.status ?? 'done'; if (typeof status !== 'string' || !validStatuses.includes(status as typeof validStatuses[number])) { throw new Error( diff --git a/src/infra/mock/types.ts b/src/infra/mock/types.ts index b1c5386..f55b2bb 100644 --- a/src/infra/mock/types.ts +++ b/src/infra/mock/types.ts @@ -12,7 +12,7 @@ export interface MockCallOptions { /** Fixed response content (optional, defaults to generic mock response) */ mockResponse?: string; /** Fixed status to return (optional, defaults to 'done') */ - mockStatus?: 'done' | 'blocked' | 'approved' | 'rejected' | 'improve'; + mockStatus?: 'done' | 'blocked' | 'error' | 'approved' | 'rejected' | 'improve'; } /** A single entry in a mock scenario */ @@ -20,7 +20,7 @@ export interface ScenarioEntry { /** Persona name to match (optional — if omitted, consumed by call order) */ persona?: string; /** Response status */ - status: 'done' | 'blocked' | 'approved' | 'rejected' | 'improve'; + status: 'done' | 'blocked' | 'error' | 'approved' | 'rejected' | 'improve'; /** Response content body */ content: string; } diff --git a/src/infra/providers/codex.ts b/src/infra/providers/codex.ts index 4dcbdcd..88c67d2 100644 --- a/src/infra/providers/codex.ts +++ b/src/infra/providers/codex.ts @@ -36,10 +36,10 @@ function toCodexOptions(options: ProviderCallOptions): CodexCallOptions { }; } -function blockedResponse(agentName: string): AgentResponse { +function errorResponse(agentName: string): AgentResponse { return { persona: agentName, - status: 'blocked', + status: 'error', content: NOT_GIT_REPO_MESSAGE, timestamp: new Date(), }; @@ -59,7 +59,7 @@ export class CodexProvider implements Provider { if (systemPrompt) { return { call: async (prompt: string, options: ProviderCallOptions): Promise => { - if (!isInsideGitRepo(options.cwd)) return blockedResponse(name); + if (!isInsideGitRepo(options.cwd)) return errorResponse(name); return callCodexCustom(name, prompt, systemPrompt, toCodexOptions(options)); }, }; @@ -67,7 +67,7 @@ export class CodexProvider implements Provider { return { call: async (prompt: string, options: ProviderCallOptions): Promise => { - if (!isInsideGitRepo(options.cwd)) return blockedResponse(name); + if (!isInsideGitRepo(options.cwd)) return errorResponse(name); return callCodex(name, prompt, toCodexOptions(options)); }, };