Codex プロセスのハングによる worker pool スロット占有を防止

Codex CLI プロセスが API 応答待ちで無応答になった場合、for await ループが
永久にブロックし worker pool のスロットを占有し続ける問題に対処。
AbortSignal の伝播経路を整備し、2層のタイムアウトを導入した。

- Codex ストリームのアイドルタイムアウト(10分無応答で中断)
- タスクレベルのタイムアウト(並列実行時、1時間で中断)
- AbortSignal を worker pool → PieceEngine → AgentRunner → Codex SDK まで伝播
This commit is contained in:
nrslib 2026-02-09 09:59:14 +09:00
parent 88f7b38796
commit 55559cc41c
10 changed files with 92 additions and 5 deletions

View File

@ -100,6 +100,7 @@ export class AgentRunner {
): ProviderCallOptions { ): ProviderCallOptions {
return { return {
cwd: options.cwd, cwd: options.cwd,
abortSignal: options.abortSignal,
sessionId: options.sessionId, sessionId: options.sessionId,
allowedTools: options.allowedTools ?? agentConfig?.allowedTools, allowedTools: options.allowedTools ?? agentConfig?.allowedTools,
mcpServers: options.mcpServers, mcpServers: options.mcpServers,

View File

@ -10,6 +10,7 @@ export type { StreamCallback };
/** Common options for running agents */ /** Common options for running agents */
export interface RunAgentOptions { export interface RunAgentOptions {
cwd: string; cwd: string;
abortSignal?: AbortSignal;
sessionId?: string; sessionId?: string;
model?: string; model?: string;
provider?: 'claude' | 'codex' | 'mock'; provider?: 'claude' | 'codex' | 'mock';

View File

@ -33,6 +33,7 @@ export class OptionsBuilder {
return { return {
cwd: this.getCwd(), cwd: this.getCwd(),
abortSignal: this.engineOptions.abortSignal,
personaPath: step.personaPath, personaPath: step.personaPath,
provider: step.provider ?? this.engineOptions.personaProviders?.[step.personaDisplayName] ?? this.engineOptions.provider, provider: step.provider ?? this.engineOptions.personaProviders?.[step.personaDisplayName] ?? this.engineOptions.provider,
model: step.model ?? this.engineOptions.model, model: step.model ?? this.engineOptions.model,

View File

@ -153,6 +153,7 @@ export type IterationLimitCallback = (request: IterationLimitRequest) => Promise
/** Options for piece engine */ /** Options for piece engine */
export interface PieceEngineOptions { export interface PieceEngineOptions {
abortSignal?: AbortSignal;
/** Callback for streaming real-time output */ /** Callback for streaming real-time output */
onStream?: StreamCallback; onStream?: StreamCallback;
/** Callback for requesting user input when an agent is blocked */ /** Callback for requesting user input when an agent is blocked */

View File

@ -331,6 +331,7 @@ export async function executePiece(
: undefined; : undefined;
const engine = new PieceEngine(pieceConfig, cwd, task, { const engine = new PieceEngine(pieceConfig, cwd, task, {
abortSignal: options.abortSignal,
onStream: streamHandler, onStream: streamHandler,
onUserInput, onUserInput,
initialSessions: savedSessions, initialSessions: savedSessions,

View File

@ -23,6 +23,7 @@ import { resolveTaskExecution } from './resolveTask.js';
export type { TaskExecutionOptions, ExecuteTaskOptions }; export type { TaskExecutionOptions, ExecuteTaskOptions };
const log = createLogger('task'); const log = createLogger('task');
const TASK_TIMEOUT_MS = 60 * 60 * 1000;
/** /**
* Resolve a GitHub issue from task data's issue number. * Resolve a GitHub issue from task data's issue number.
@ -107,12 +108,29 @@ export async function executeAndCompleteTask(
): Promise<boolean> { ): Promise<boolean> {
const startedAt = new Date().toISOString(); const startedAt = new Date().toISOString();
const executionLog: string[] = []; const executionLog: string[] = [];
const taskAbortController = new AbortController();
const externalAbortSignal = parallelOptions?.abortSignal;
const taskTimeoutMs = externalAbortSignal ? TASK_TIMEOUT_MS : undefined;
const taskAbortSignal = externalAbortSignal ? taskAbortController.signal : undefined;
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const onExternalAbort = (): void => {
taskAbortController.abort();
};
if (externalAbortSignal) {
if (externalAbortSignal.aborted) {
taskAbortController.abort();
} else {
externalAbortSignal.addEventListener('abort', onExternalAbort, { once: true });
}
}
try { try {
const { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr, issueNumber } = await resolveTaskExecution(task, cwd, pieceName); const { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr, issueNumber } = await resolveTaskExecution(task, cwd, pieceName);
// cwd is always the project root; pass it as projectCwd so reports/sessions go there // cwd is always the project root; pass it as projectCwd so reports/sessions go there
const taskSuccess = await executeTask({ const taskRunPromise = executeTask({
task: task.content, task: task.content,
cwd: execCwd, cwd: execCwd,
pieceIdentifier: execPiece, pieceIdentifier: execPiece,
@ -120,10 +138,26 @@ export async function executeAndCompleteTask(
agentOverrides: options, agentOverrides: options,
startMovement, startMovement,
retryNote, retryNote,
abortSignal: parallelOptions?.abortSignal, abortSignal: taskAbortSignal,
taskPrefix: parallelOptions?.taskPrefix, taskPrefix: parallelOptions?.taskPrefix,
taskColorIndex: parallelOptions?.taskColorIndex, taskColorIndex: parallelOptions?.taskColorIndex,
}); });
const timeoutPromise = taskTimeoutMs && taskTimeoutMs > 0
? new Promise<boolean>((_, reject) => {
timeoutId = setTimeout(() => {
taskAbortController.abort();
reject(new Error(`Task timed out after ${Math.floor(taskTimeoutMs / 60000)} minutes`));
}, taskTimeoutMs);
})
: undefined;
const taskSuccess = timeoutPromise
? await Promise.race<boolean>([
taskRunPromise,
timeoutPromise,
])
: await taskRunPromise;
const completedAt = new Date().toISOString(); const completedAt = new Date().toISOString();
if (taskSuccess && isWorktree) { if (taskSuccess && isWorktree) {
@ -192,6 +226,13 @@ export async function executeAndCompleteTask(
error(`Task "${task.name}" error: ${getErrorMessage(err)}`); error(`Task "${task.name}" error: ${getErrorMessage(err)}`);
return false; return false;
} finally {
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
if (externalAbortSignal) {
externalAbortSignal.removeEventListener('abort', onExternalAbort);
}
} }
} }

View File

@ -23,6 +23,7 @@ import {
export type { CodexCallOptions } from './types.js'; export type { CodexCallOptions } from './types.js';
const log = createLogger('codex-sdk'); const log = createLogger('codex-sdk');
const CODEX_STREAM_IDLE_TIMEOUT_MS = 10 * 60 * 1000;
/** /**
* Client for Codex SDK agent interactions. * Client for Codex SDK agent interactions.
@ -55,6 +56,31 @@ export class CodexClient {
? `${options.systemPrompt}\n\n${prompt}` ? `${options.systemPrompt}\n\n${prompt}`
: prompt; : prompt;
let idleTimeoutId: ReturnType<typeof setTimeout> | undefined;
const streamAbortController = new AbortController();
const abortMessage = `Codex stream timed out after ${Math.floor(CODEX_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`;
const resetIdleTimeout = (): void => {
if (idleTimeoutId !== undefined) {
clearTimeout(idleTimeoutId);
}
idleTimeoutId = setTimeout(() => {
streamAbortController.abort();
}, CODEX_STREAM_IDLE_TIMEOUT_MS);
};
const onExternalAbort = (): void => {
streamAbortController.abort();
};
if (options.abortSignal) {
if (options.abortSignal.aborted) {
streamAbortController.abort();
} else {
options.abortSignal.addEventListener('abort', onExternalAbort, { once: true });
}
}
try { try {
log.debug('Executing Codex thread', { log.debug('Executing Codex thread', {
agentType, agentType,
@ -62,7 +88,10 @@ export class CodexClient {
hasSystemPrompt: !!options.systemPrompt, hasSystemPrompt: !!options.systemPrompt,
}); });
const { events } = await thread.runStreamed(fullPrompt); const { events } = await thread.runStreamed(fullPrompt, {
signal: streamAbortController.signal,
});
resetIdleTimeout();
let content = ''; let content = '';
const contentOffsets = new Map<string, number>(); const contentOffsets = new Map<string, number>();
let success = true; let success = true;
@ -70,6 +99,7 @@ export class CodexClient {
const state = createStreamTrackingState(); const state = createStreamTrackingState();
for await (const event of events as AsyncGenerator<CodexEvent>) { for await (const event of events as AsyncGenerator<CodexEvent>) {
resetIdleTimeout();
if (event.type === 'thread.started') { if (event.type === 'thread.started') {
threadId = typeof event.thread_id === 'string' ? event.thread_id : threadId; threadId = typeof event.thread_id === 'string' ? event.thread_id : threadId;
emitInit(options.onStream, options.model, threadId); emitInit(options.onStream, options.model, threadId);
@ -172,15 +202,23 @@ export class CodexClient {
}; };
} catch (error) { } catch (error) {
const message = getErrorMessage(error); const message = getErrorMessage(error);
emitResult(options.onStream, false, message, threadId); const errorMessage = streamAbortController.signal.aborted ? abortMessage : message;
emitResult(options.onStream, false, errorMessage, threadId);
return { return {
persona: agentType, persona: agentType,
status: 'blocked', status: 'blocked',
content: message, content: errorMessage,
timestamp: new Date(), timestamp: new Date(),
sessionId: threadId, sessionId: threadId,
}; };
} finally {
if (idleTimeoutId !== undefined) {
clearTimeout(idleTimeoutId);
}
if (options.abortSignal) {
options.abortSignal.removeEventListener('abort', onExternalAbort);
}
} }
} }

View File

@ -21,6 +21,7 @@ export function mapToCodexSandboxMode(mode: PermissionMode): CodexSandboxMode {
/** Options for calling Codex */ /** Options for calling Codex */
export interface CodexCallOptions { export interface CodexCallOptions {
cwd: string; cwd: string;
abortSignal?: AbortSignal;
sessionId?: string; sessionId?: string;
model?: string; model?: string;
systemPrompt?: string; systemPrompt?: string;

View File

@ -27,6 +27,7 @@ function isInsideGitRepo(cwd: string): boolean {
function toCodexOptions(options: ProviderCallOptions): CodexCallOptions { function toCodexOptions(options: ProviderCallOptions): CodexCallOptions {
return { return {
cwd: options.cwd, cwd: options.cwd,
abortSignal: options.abortSignal,
sessionId: options.sessionId, sessionId: options.sessionId,
model: options.model, model: options.model,
permissionMode: options.permissionMode, permissionMode: options.permissionMode,

View File

@ -20,6 +20,7 @@ export interface AgentSetup {
/** Runtime options passed at call time */ /** Runtime options passed at call time */
export interface ProviderCallOptions { export interface ProviderCallOptions {
cwd: string; cwd: string;
abortSignal?: AbortSignal;
sessionId?: string; sessionId?: string;
model?: string; model?: string;
allowedTools?: string[]; allowedTools?: string[];