プロバイダーエラーを blocked から error ステータスに分離し、Codex にリトライ機構を追加

blocked はユーザー入力で解決可能な状態、error はプロバイダー障害として意味を明確化。
PieceEngine で error ステータスを検知して即座に abort する。
Codex クライアントにトランジェントエラー(stream disconnected, transport error 等)の
指数バックオフリトライ(最大3回)を追加。
This commit is contained in:
nrslib 2026-02-09 22:04:52 +09:00
parent 8e0257e747
commit 222560a96a
13 changed files with 269 additions and 160 deletions

View File

@ -141,4 +141,28 @@ describe('PieceEngine Integration: Blocked Handling', () => {
expect(userInputFn).toHaveBeenCalledOnce();
expect(state.userInputs).toContain('User provided clarification');
});
it('should abort immediately when movement returns error status', async () => {
const config = buildDefaultPieceConfig();
const onUserInput = vi.fn().mockResolvedValueOnce('should not be called');
const engine = new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir, onUserInput });
mockRunAgentSequence([
makeResponse({ persona: 'plan', status: 'error', content: 'Transport error', error: 'Transport error' }),
]);
mockDetectMatchedRuleSequence([
{ index: 0, method: 'phase1_tag' },
]);
const abortFn = vi.fn();
engine.on('piece:abort', abortFn);
const state = await engine.run();
expect(state.status).toBe('aborted');
expect(onUserInput).not.toHaveBeenCalled();
expect(abortFn).toHaveBeenCalledWith(expect.anything(), expect.stringContaining('Transport error'));
});
});

View File

@ -4,7 +4,7 @@
* Covers:
* - One sub-movement fails while another succeeds piece continues
* - All sub-movements fail piece aborts
* - Failed sub-movement is recorded as blocked with error
* - Failed sub-movement is recorded as error with error message
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
@ -141,10 +141,10 @@ describe('PieceEngine Integration: Parallel Movement Partial Failure', () => {
expect(state.status).toBe('completed');
// arch-review should be recorded as blocked
// arch-review should be recorded as error
const archReviewOutput = state.movementOutputs.get('arch-review');
expect(archReviewOutput).toBeDefined();
expect(archReviewOutput!.status).toBe('blocked');
expect(archReviewOutput!.status).toBe('error');
expect(archReviewOutput!.error).toContain('exit');
// security-review should be recorded as done

View File

@ -33,6 +33,7 @@ describe('StatusSchema', () => {
expect(StatusSchema.parse('approved')).toBe('approved');
expect(StatusSchema.parse('rejected')).toBe('rejected');
expect(StatusSchema.parse('blocked')).toBe('blocked');
expect(StatusSchema.parse('error')).toBe('error');
expect(StatusSchema.parse('answer')).toBe('answer');
});

View File

@ -48,6 +48,7 @@ export const StatusSchema = z.enum([
'pending',
'done',
'blocked',
'error',
'approved',
'rejected',
'improve',

View File

@ -10,6 +10,7 @@ export type Status =
| 'pending'
| 'done'
| 'blocked'
| 'error'
| 'approved'
| 'rejected'
| 'improve'

View File

@ -131,7 +131,7 @@ export class ParallelRunner {
}),
);
// Map settled results: fulfilled → as-is, rejected → blocked AgentResponse
// Map settled results: fulfilled → as-is, rejected → error AgentResponse
const subResults = settled.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
@ -139,15 +139,15 @@ export class ParallelRunner {
const failedMovement = subMovements[index]!;
const errorMsg = getErrorMessage(result.reason);
log.error('Sub-movement failed', { movement: failedMovement.name, error: errorMsg });
const blockedResponse: AgentResponse = {
const errorResponse: AgentResponse = {
persona: failedMovement.name,
status: 'blocked',
status: 'error',
content: '',
timestamp: new Date(),
error: errorMsg,
};
state.movementOutputs.set(failedMovement.name, blockedResponse);
return { subMovement: failedMovement, response: blockedResponse, instruction: '' };
state.movementOutputs.set(failedMovement.name, errorResponse);
return { subMovement: failedMovement, response: errorResponse, instruction: '' };
});
// If all sub-movements failed (error-originated), throw

View File

@ -522,6 +522,13 @@ export class PieceEngine extends EventEmitter {
break;
}
if (response.status === 'error') {
const detail = response.error ?? response.content ?? `Movement "${movement.name}" returned error status`;
this.state.status = 'aborted';
this.emit('piece:abort', this.state, `Movement "${movement.name}" failed: ${detail}`);
break;
}
let nextMovement = this.resolveNextMovement(movement, response);
log.debug('Movement transition', {
from: movement.name,

View File

@ -109,7 +109,7 @@ export async function callAIWithRetry(
onStream: display.createHandler(),
});
display.flush();
const success = response.status !== 'blocked';
const success = response.status !== 'blocked' && response.status !== 'error';
if (!success && sessionId) {
log.info('Session invalid, retrying without session');
@ -129,7 +129,7 @@ export async function callAIWithRetry(
updatePersonaSession(cwd, ctx.personaName, sessionId, ctx.providerType);
}
return {
result: { content: retry.content, sessionId: retry.sessionId, success: retry.status !== 'blocked' },
result: { content: retry.content, sessionId: retry.sessionId, success: retry.status !== 'blocked' && retry.status !== 'error' },
sessionId,
};
}

View File

@ -29,7 +29,7 @@ export class ClaudeClient {
if (result.interrupted) {
return 'interrupted';
}
return 'blocked';
return 'error';
}
return 'done';
}
@ -146,7 +146,7 @@ export class ClaudeClient {
return {
persona: `skill:${skillName}`,
status: result.success ? 'done' : 'blocked',
status: result.success ? 'done' : 'error',
content: result.content,
timestamp: new Date(),
sessionId: result.sessionId,

View File

@ -25,6 +25,18 @@ export type { CodexCallOptions } from './types.js';
const log = createLogger('codex-sdk');
const CODEX_STREAM_IDLE_TIMEOUT_MS = 10 * 60 * 1000;
const CODEX_STREAM_ABORTED_MESSAGE = 'Codex execution aborted';
const CODEX_RETRY_MAX_ATTEMPTS = 3;
const CODEX_RETRY_BASE_DELAY_MS = 250;
const CODEX_RETRYABLE_ERROR_PATTERNS = [
'stream disconnected before completion',
'transport error',
'network error',
'error decoding response body',
'econnreset',
'etimedout',
'eai_again',
'fetch failed',
];
/**
* Client for Codex SDK agent interactions.
@ -33,13 +45,49 @@ const CODEX_STREAM_ABORTED_MESSAGE = 'Codex execution aborted';
* and response processing.
*/
export class CodexClient {
private isRetriableError(message: string, aborted: boolean, abortCause?: 'timeout' | 'external'): boolean {
if (aborted || abortCause) {
return false;
}
const lower = message.toLowerCase();
return CODEX_RETRYABLE_ERROR_PATTERNS.some((pattern) => lower.includes(pattern));
}
private async waitForRetryDelay(attempt: number, signal?: AbortSignal): Promise<void> {
const delayMs = CODEX_RETRY_BASE_DELAY_MS * (2 ** Math.max(0, attempt - 1));
await new Promise<void>((resolve, reject) => {
const timeoutId = setTimeout(() => {
if (signal) {
signal.removeEventListener('abort', onAbort);
}
resolve();
}, delayMs);
const onAbort = (): void => {
clearTimeout(timeoutId);
if (signal) {
signal.removeEventListener('abort', onAbort);
}
reject(new Error(CODEX_STREAM_ABORTED_MESSAGE));
};
if (signal) {
if (signal.aborted) {
onAbort();
return;
}
signal.addEventListener('abort', onAbort, { once: true });
}
});
}
/** Call Codex with an agent prompt */
async call(
agentType: string,
prompt: string,
options: CodexCallOptions,
): Promise<AgentResponse> {
const codex = new Codex(options.openaiApiKey ? { apiKey: options.openaiApiKey } : undefined);
const sandboxMode = options.permissionMode
? mapToCodexSandboxMode(options.permissionMode)
: 'workspace-write';
@ -48,186 +96,213 @@ export class CodexClient {
workingDirectory: options.cwd,
sandboxMode,
};
const thread = options.sessionId
? await codex.resumeThread(options.sessionId, threadOptions)
: await codex.startThread(threadOptions);
let threadId = extractThreadId(thread) || options.sessionId;
let threadId = options.sessionId;
const fullPrompt = options.systemPrompt
? `${options.systemPrompt}\n\n${prompt}`
: prompt;
let idleTimeoutId: ReturnType<typeof setTimeout> | undefined;
const streamAbortController = new AbortController();
const timeoutMessage = `Codex stream timed out after ${Math.floor(CODEX_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`;
let abortCause: 'timeout' | 'external' | undefined;
for (let attempt = 1; attempt <= CODEX_RETRY_MAX_ATTEMPTS; attempt++) {
const codex = new Codex(options.openaiApiKey ? { apiKey: options.openaiApiKey } : undefined);
const thread = threadId
? await codex.resumeThread(threadId, threadOptions)
: await codex.startThread(threadOptions);
let currentThreadId = extractThreadId(thread) || threadId;
const resetIdleTimeout = (): void => {
if (idleTimeoutId !== undefined) {
clearTimeout(idleTimeoutId);
}
idleTimeoutId = setTimeout(() => {
abortCause = 'timeout';
let idleTimeoutId: ReturnType<typeof setTimeout> | undefined;
const streamAbortController = new AbortController();
const timeoutMessage = `Codex stream timed out after ${Math.floor(CODEX_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`;
let abortCause: 'timeout' | 'external' | undefined;
const resetIdleTimeout = (): void => {
if (idleTimeoutId !== undefined) {
clearTimeout(idleTimeoutId);
}
idleTimeoutId = setTimeout(() => {
abortCause = 'timeout';
streamAbortController.abort();
}, CODEX_STREAM_IDLE_TIMEOUT_MS);
};
const onExternalAbort = (): void => {
abortCause = 'external';
streamAbortController.abort();
}, CODEX_STREAM_IDLE_TIMEOUT_MS);
};
};
const onExternalAbort = (): void => {
abortCause = 'external';
streamAbortController.abort();
};
if (options.abortSignal) {
if (options.abortSignal.aborted) {
streamAbortController.abort();
} else {
options.abortSignal.addEventListener('abort', onExternalAbort, { once: true });
if (options.abortSignal) {
if (options.abortSignal.aborted) {
streamAbortController.abort();
} else {
options.abortSignal.addEventListener('abort', onExternalAbort, { once: true });
}
}
}
try {
log.debug('Executing Codex thread', {
agentType,
model: options.model,
hasSystemPrompt: !!options.systemPrompt,
});
try {
log.debug('Executing Codex thread', {
agentType,
model: options.model,
hasSystemPrompt: !!options.systemPrompt,
attempt,
});
const { events } = await thread.runStreamed(fullPrompt, {
signal: streamAbortController.signal,
});
resetIdleTimeout();
let content = '';
const contentOffsets = new Map<string, number>();
let success = true;
let failureMessage = '';
const state = createStreamTrackingState();
for await (const event of events as AsyncGenerator<CodexEvent>) {
const { events } = await thread.runStreamed(fullPrompt, {
signal: streamAbortController.signal,
});
resetIdleTimeout();
if (event.type === 'thread.started') {
threadId = typeof event.thread_id === 'string' ? event.thread_id : threadId;
emitInit(options.onStream, options.model, threadId);
continue;
}
if (event.type === 'turn.failed') {
success = false;
if (event.error && typeof event.error === 'object' && 'message' in event.error) {
failureMessage = String((event.error as { message?: unknown }).message ?? '');
let content = '';
const contentOffsets = new Map<string, number>();
let success = true;
let failureMessage = '';
const state = createStreamTrackingState();
for await (const event of events as AsyncGenerator<CodexEvent>) {
resetIdleTimeout();
if (event.type === 'thread.started') {
currentThreadId = typeof event.thread_id === 'string' ? event.thread_id : currentThreadId;
emitInit(options.onStream, options.model, currentThreadId);
continue;
}
break;
}
if (event.type === 'error') {
success = false;
failureMessage = typeof event.message === 'string' ? event.message : 'Unknown error';
break;
}
if (event.type === 'item.started') {
const item = event.item as CodexItem | undefined;
if (item) {
emitCodexItemStart(item, options.onStream, state.startedItems);
if (event.type === 'turn.failed') {
success = false;
if (event.error && typeof event.error === 'object' && 'message' in event.error) {
failureMessage = String((event.error as { message?: unknown }).message ?? '');
}
break;
}
continue;
}
if (event.type === 'item.updated') {
const item = event.item as CodexItem | undefined;
if (item) {
if (item.type === 'agent_message' && typeof item.text === 'string') {
const itemId = item.id;
const text = item.text;
if (itemId) {
const prev = contentOffsets.get(itemId) ?? 0;
if (text.length > prev) {
if (prev === 0 && content.length > 0) {
content += '\n';
if (event.type === 'error') {
success = false;
failureMessage = typeof event.message === 'string' ? event.message : 'Unknown error';
break;
}
if (event.type === 'item.started') {
const item = event.item as CodexItem | undefined;
if (item) {
emitCodexItemStart(item, options.onStream, state.startedItems);
}
continue;
}
if (event.type === 'item.updated') {
const item = event.item as CodexItem | undefined;
if (item) {
if (item.type === 'agent_message' && typeof item.text === 'string') {
const itemId = item.id;
const text = item.text;
if (itemId) {
const prev = contentOffsets.get(itemId) ?? 0;
if (text.length > prev) {
if (prev === 0 && content.length > 0) {
content += '\n';
}
content += text.slice(prev);
contentOffsets.set(itemId, text.length);
}
content += text.slice(prev);
contentOffsets.set(itemId, text.length);
}
}
emitCodexItemUpdate(item, options.onStream, state);
}
emitCodexItemUpdate(item, options.onStream, state);
continue;
}
continue;
}
if (event.type === 'item.completed') {
const item = event.item as CodexItem | undefined;
if (item) {
if (item.type === 'agent_message' && typeof item.text === 'string') {
const itemId = item.id;
const text = item.text;
if (itemId) {
const prev = contentOffsets.get(itemId) ?? 0;
if (text.length > prev) {
if (prev === 0 && content.length > 0) {
if (event.type === 'item.completed') {
const item = event.item as CodexItem | undefined;
if (item) {
if (item.type === 'agent_message' && typeof item.text === 'string') {
const itemId = item.id;
const text = item.text;
if (itemId) {
const prev = contentOffsets.get(itemId) ?? 0;
if (text.length > prev) {
if (prev === 0 && content.length > 0) {
content += '\n';
}
content += text.slice(prev);
contentOffsets.set(itemId, text.length);
}
} else if (text) {
if (content.length > 0) {
content += '\n';
}
content += text.slice(prev);
contentOffsets.set(itemId, text.length);
content += text;
}
} else if (text) {
if (content.length > 0) {
content += '\n';
}
content += text;
}
emitCodexItemCompleted(item, options.onStream, state);
}
emitCodexItemCompleted(item, options.onStream, state);
continue;
}
continue;
}
}
if (!success) {
const message = failureMessage || 'Codex execution failed';
emitResult(options.onStream, false, message, threadId);
if (!success) {
const message = failureMessage || 'Codex execution failed';
const retriable = this.isRetriableError(message, streamAbortController.signal.aborted, abortCause);
if (retriable && attempt < CODEX_RETRY_MAX_ATTEMPTS) {
log.info('Retrying Codex call after transient failure', { agentType, attempt, message });
threadId = currentThreadId;
await this.waitForRetryDelay(attempt, options.abortSignal);
continue;
}
emitResult(options.onStream, false, message, currentThreadId);
return {
persona: agentType,
status: 'error',
content: message,
timestamp: new Date(),
sessionId: currentThreadId,
};
}
const trimmed = content.trim();
emitResult(options.onStream, true, trimmed, currentThreadId);
return {
persona: agentType,
status: 'blocked',
content: message,
status: 'done',
content: trimmed,
timestamp: new Date(),
sessionId: threadId,
sessionId: currentThreadId,
};
}
} catch (error) {
const message = getErrorMessage(error);
const errorMessage = streamAbortController.signal.aborted
? abortCause === 'timeout'
? timeoutMessage
: CODEX_STREAM_ABORTED_MESSAGE
: message;
const trimmed = content.trim();
emitResult(options.onStream, true, trimmed, threadId);
const retriable = this.isRetriableError(errorMessage, streamAbortController.signal.aborted, abortCause);
if (retriable && attempt < CODEX_RETRY_MAX_ATTEMPTS) {
log.info('Retrying Codex call after transient exception', { agentType, attempt, errorMessage });
threadId = currentThreadId;
await this.waitForRetryDelay(attempt, options.abortSignal);
continue;
}
return {
persona: agentType,
status: 'done',
content: trimmed,
timestamp: new Date(),
sessionId: threadId,
};
} catch (error) {
const message = getErrorMessage(error);
const errorMessage = streamAbortController.signal.aborted
? abortCause === 'timeout'
? timeoutMessage
: CODEX_STREAM_ABORTED_MESSAGE
: message;
emitResult(options.onStream, false, errorMessage, threadId);
emitResult(options.onStream, false, errorMessage, currentThreadId);
return {
persona: agentType,
status: 'blocked',
content: errorMessage,
timestamp: new Date(),
sessionId: threadId,
};
} finally {
if (idleTimeoutId !== undefined) {
clearTimeout(idleTimeoutId);
}
if (options.abortSignal) {
options.abortSignal.removeEventListener('abort', onExternalAbort);
return {
persona: agentType,
status: 'error',
content: errorMessage,
timestamp: new Date(),
sessionId: currentThreadId,
};
} finally {
if (idleTimeoutId !== undefined) {
clearTimeout(idleTimeoutId);
}
if (options.abortSignal) {
options.abortSignal.removeEventListener('abort', onExternalAbort);
}
}
}
throw new Error('Unreachable: Codex retry loop exhausted without returning');
}
/** Call Codex with a custom agent configuration (system prompt + prompt) */

View File

@ -130,7 +130,7 @@ function validateEntry(entry: unknown, index: number): ScenarioEntry {
}
// status defaults to 'done'
const validStatuses = ['done', 'blocked', 'approved', 'rejected', 'improve'] as const;
const validStatuses = ['done', 'blocked', 'error', 'approved', 'rejected', 'improve'] as const;
const status = obj.status ?? 'done';
if (typeof status !== 'string' || !validStatuses.includes(status as typeof validStatuses[number])) {
throw new Error(

View File

@ -12,7 +12,7 @@ export interface MockCallOptions {
/** Fixed response content (optional, defaults to generic mock response) */
mockResponse?: string;
/** Fixed status to return (optional, defaults to 'done') */
mockStatus?: 'done' | 'blocked' | 'approved' | 'rejected' | 'improve';
mockStatus?: 'done' | 'blocked' | 'error' | 'approved' | 'rejected' | 'improve';
}
/** A single entry in a mock scenario */
@ -20,7 +20,7 @@ export interface ScenarioEntry {
/** Persona name to match (optional — if omitted, consumed by call order) */
persona?: string;
/** Response status */
status: 'done' | 'blocked' | 'approved' | 'rejected' | 'improve';
status: 'done' | 'blocked' | 'error' | 'approved' | 'rejected' | 'improve';
/** Response content body */
content: string;
}

View File

@ -36,10 +36,10 @@ function toCodexOptions(options: ProviderCallOptions): CodexCallOptions {
};
}
function blockedResponse(agentName: string): AgentResponse {
function errorResponse(agentName: string): AgentResponse {
return {
persona: agentName,
status: 'blocked',
status: 'error',
content: NOT_GIT_REPO_MESSAGE,
timestamp: new Date(),
};
@ -59,7 +59,7 @@ export class CodexProvider implements Provider {
if (systemPrompt) {
return {
call: async (prompt: string, options: ProviderCallOptions): Promise<AgentResponse> => {
if (!isInsideGitRepo(options.cwd)) return blockedResponse(name);
if (!isInsideGitRepo(options.cwd)) return errorResponse(name);
return callCodexCustom(name, prompt, systemPrompt, toCodexOptions(options));
},
};
@ -67,7 +67,7 @@ export class CodexProvider implements Provider {
return {
call: async (prompt: string, options: ProviderCallOptions): Promise<AgentResponse> => {
if (!isInsideGitRepo(options.cwd)) return blockedResponse(name);
if (!isInsideGitRepo(options.cwd)) return errorResponse(name);
return callCodex(name, prompt, toCodexOptions(options));
},
};