From 55559cc41c6e19c06890dcfc83469d4a9344c26a Mon Sep 17 00:00:00 2001 From: nrslib <38722970+nrslib@users.noreply.github.com> Date: Mon, 9 Feb 2026 09:59:14 +0900 Subject: [PATCH] =?UTF-8?q?Codex=20=E3=83=97=E3=83=AD=E3=82=BB=E3=82=B9?= =?UTF-8?q?=E3=81=AE=E3=83=8F=E3=83=B3=E3=82=B0=E3=81=AB=E3=82=88=E3=82=8B?= =?UTF-8?q?=20worker=20pool=20=E3=82=B9=E3=83=AD=E3=83=83=E3=83=88?= =?UTF-8?q?=E5=8D=A0=E6=9C=89=E3=82=92=E9=98=B2=E6=AD=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex CLI プロセスが API 応答待ちで無応答になった場合、for await ループが 永久にブロックし worker pool のスロットを占有し続ける問題に対処。 AbortSignal の伝播経路を整備し、2層のタイムアウトを導入した。 - Codex ストリームのアイドルタイムアウト(10分無応答で中断) - タスクレベルのタイムアウト(並列実行時、1時間で中断) - AbortSignal を worker pool → PieceEngine → AgentRunner → Codex SDK まで伝播 --- src/agents/runner.ts | 1 + src/agents/types.ts | 1 + src/core/piece/engine/OptionsBuilder.ts | 1 + src/core/piece/types.ts | 1 + src/features/tasks/execute/pieceExecution.ts | 1 + src/features/tasks/execute/taskExecution.ts | 45 +++++++++++++++++++- src/infra/codex/client.ts | 44 +++++++++++++++++-- src/infra/codex/types.ts | 1 + src/infra/providers/codex.ts | 1 + src/infra/providers/types.ts | 1 + 10 files changed, 92 insertions(+), 5 deletions(-) diff --git a/src/agents/runner.ts b/src/agents/runner.ts index 92cf323..cb54c93 100644 --- a/src/agents/runner.ts +++ b/src/agents/runner.ts @@ -100,6 +100,7 @@ export class AgentRunner { ): ProviderCallOptions { return { cwd: options.cwd, + abortSignal: options.abortSignal, sessionId: options.sessionId, allowedTools: options.allowedTools ?? agentConfig?.allowedTools, mcpServers: options.mcpServers, diff --git a/src/agents/types.ts b/src/agents/types.ts index e739c84..cdfd2a6 100644 --- a/src/agents/types.ts +++ b/src/agents/types.ts @@ -10,6 +10,7 @@ export type { StreamCallback }; /** Common options for running agents */ export interface RunAgentOptions { cwd: string; + abortSignal?: AbortSignal; sessionId?: string; model?: string; provider?: 'claude' | 'codex' | 'mock'; diff --git a/src/core/piece/engine/OptionsBuilder.ts b/src/core/piece/engine/OptionsBuilder.ts index d5866cf..9114ca0 100644 --- a/src/core/piece/engine/OptionsBuilder.ts +++ b/src/core/piece/engine/OptionsBuilder.ts @@ -33,6 +33,7 @@ export class OptionsBuilder { return { cwd: this.getCwd(), + abortSignal: this.engineOptions.abortSignal, personaPath: step.personaPath, provider: step.provider ?? this.engineOptions.personaProviders?.[step.personaDisplayName] ?? this.engineOptions.provider, model: step.model ?? this.engineOptions.model, diff --git a/src/core/piece/types.ts b/src/core/piece/types.ts index 2fd1dc6..82b681c 100644 --- a/src/core/piece/types.ts +++ b/src/core/piece/types.ts @@ -153,6 +153,7 @@ export type IterationLimitCallback = (request: IterationLimitRequest) => Promise /** Options for piece engine */ export interface PieceEngineOptions { + abortSignal?: AbortSignal; /** Callback for streaming real-time output */ onStream?: StreamCallback; /** Callback for requesting user input when an agent is blocked */ diff --git a/src/features/tasks/execute/pieceExecution.ts b/src/features/tasks/execute/pieceExecution.ts index 786d67a..b543f62 100644 --- a/src/features/tasks/execute/pieceExecution.ts +++ b/src/features/tasks/execute/pieceExecution.ts @@ -331,6 +331,7 @@ export async function executePiece( : undefined; const engine = new PieceEngine(pieceConfig, cwd, task, { + abortSignal: options.abortSignal, onStream: streamHandler, onUserInput, initialSessions: savedSessions, diff --git a/src/features/tasks/execute/taskExecution.ts b/src/features/tasks/execute/taskExecution.ts index 460be26..fc412d5 100644 --- a/src/features/tasks/execute/taskExecution.ts +++ b/src/features/tasks/execute/taskExecution.ts @@ -23,6 +23,7 @@ import { resolveTaskExecution } from './resolveTask.js'; export type { TaskExecutionOptions, ExecuteTaskOptions }; const log = createLogger('task'); +const TASK_TIMEOUT_MS = 60 * 60 * 1000; /** * Resolve a GitHub issue from task data's issue number. @@ -107,12 +108,29 @@ export async function executeAndCompleteTask( ): Promise { const startedAt = new Date().toISOString(); const executionLog: string[] = []; + const taskAbortController = new AbortController(); + const externalAbortSignal = parallelOptions?.abortSignal; + const taskTimeoutMs = externalAbortSignal ? TASK_TIMEOUT_MS : undefined; + const taskAbortSignal = externalAbortSignal ? taskAbortController.signal : undefined; + let timeoutId: ReturnType | undefined; + + const onExternalAbort = (): void => { + taskAbortController.abort(); + }; + + if (externalAbortSignal) { + if (externalAbortSignal.aborted) { + taskAbortController.abort(); + } else { + externalAbortSignal.addEventListener('abort', onExternalAbort, { once: true }); + } + } try { const { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr, issueNumber } = await resolveTaskExecution(task, cwd, pieceName); // cwd is always the project root; pass it as projectCwd so reports/sessions go there - const taskSuccess = await executeTask({ + const taskRunPromise = executeTask({ task: task.content, cwd: execCwd, pieceIdentifier: execPiece, @@ -120,10 +138,26 @@ export async function executeAndCompleteTask( agentOverrides: options, startMovement, retryNote, - abortSignal: parallelOptions?.abortSignal, + abortSignal: taskAbortSignal, taskPrefix: parallelOptions?.taskPrefix, taskColorIndex: parallelOptions?.taskColorIndex, }); + + const timeoutPromise = taskTimeoutMs && taskTimeoutMs > 0 + ? new Promise((_, reject) => { + timeoutId = setTimeout(() => { + taskAbortController.abort(); + reject(new Error(`Task timed out after ${Math.floor(taskTimeoutMs / 60000)} minutes`)); + }, taskTimeoutMs); + }) + : undefined; + + const taskSuccess = timeoutPromise + ? await Promise.race([ + taskRunPromise, + timeoutPromise, + ]) + : await taskRunPromise; const completedAt = new Date().toISOString(); if (taskSuccess && isWorktree) { @@ -192,6 +226,13 @@ export async function executeAndCompleteTask( error(`Task "${task.name}" error: ${getErrorMessage(err)}`); return false; + } finally { + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + } + if (externalAbortSignal) { + externalAbortSignal.removeEventListener('abort', onExternalAbort); + } } } diff --git a/src/infra/codex/client.ts b/src/infra/codex/client.ts index 14b84ab..5c5d2b1 100644 --- a/src/infra/codex/client.ts +++ b/src/infra/codex/client.ts @@ -23,6 +23,7 @@ import { export type { CodexCallOptions } from './types.js'; const log = createLogger('codex-sdk'); +const CODEX_STREAM_IDLE_TIMEOUT_MS = 10 * 60 * 1000; /** * Client for Codex SDK agent interactions. @@ -55,6 +56,31 @@ export class CodexClient { ? `${options.systemPrompt}\n\n${prompt}` : prompt; + let idleTimeoutId: ReturnType | undefined; + const streamAbortController = new AbortController(); + const abortMessage = `Codex stream timed out after ${Math.floor(CODEX_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; + + const resetIdleTimeout = (): void => { + if (idleTimeoutId !== undefined) { + clearTimeout(idleTimeoutId); + } + idleTimeoutId = setTimeout(() => { + streamAbortController.abort(); + }, CODEX_STREAM_IDLE_TIMEOUT_MS); + }; + + const onExternalAbort = (): void => { + streamAbortController.abort(); + }; + + if (options.abortSignal) { + if (options.abortSignal.aborted) { + streamAbortController.abort(); + } else { + options.abortSignal.addEventListener('abort', onExternalAbort, { once: true }); + } + } + try { log.debug('Executing Codex thread', { agentType, @@ -62,7 +88,10 @@ export class CodexClient { hasSystemPrompt: !!options.systemPrompt, }); - const { events } = await thread.runStreamed(fullPrompt); + const { events } = await thread.runStreamed(fullPrompt, { + signal: streamAbortController.signal, + }); + resetIdleTimeout(); let content = ''; const contentOffsets = new Map(); let success = true; @@ -70,6 +99,7 @@ export class CodexClient { const state = createStreamTrackingState(); for await (const event of events as AsyncGenerator) { + resetIdleTimeout(); if (event.type === 'thread.started') { threadId = typeof event.thread_id === 'string' ? event.thread_id : threadId; emitInit(options.onStream, options.model, threadId); @@ -172,15 +202,23 @@ export class CodexClient { }; } catch (error) { const message = getErrorMessage(error); - emitResult(options.onStream, false, message, threadId); + const errorMessage = streamAbortController.signal.aborted ? abortMessage : message; + emitResult(options.onStream, false, errorMessage, threadId); return { persona: agentType, status: 'blocked', - content: message, + content: errorMessage, timestamp: new Date(), sessionId: threadId, }; + } finally { + if (idleTimeoutId !== undefined) { + clearTimeout(idleTimeoutId); + } + if (options.abortSignal) { + options.abortSignal.removeEventListener('abort', onExternalAbort); + } } } diff --git a/src/infra/codex/types.ts b/src/infra/codex/types.ts index 6c6aa9a..1834167 100644 --- a/src/infra/codex/types.ts +++ b/src/infra/codex/types.ts @@ -21,6 +21,7 @@ export function mapToCodexSandboxMode(mode: PermissionMode): CodexSandboxMode { /** Options for calling Codex */ export interface CodexCallOptions { cwd: string; + abortSignal?: AbortSignal; sessionId?: string; model?: string; systemPrompt?: string; diff --git a/src/infra/providers/codex.ts b/src/infra/providers/codex.ts index 3ccdd46..4dcbdcd 100644 --- a/src/infra/providers/codex.ts +++ b/src/infra/providers/codex.ts @@ -27,6 +27,7 @@ function isInsideGitRepo(cwd: string): boolean { function toCodexOptions(options: ProviderCallOptions): CodexCallOptions { return { cwd: options.cwd, + abortSignal: options.abortSignal, sessionId: options.sessionId, model: options.model, permissionMode: options.permissionMode, diff --git a/src/infra/providers/types.ts b/src/infra/providers/types.ts index f0af97b..df688d9 100644 --- a/src/infra/providers/types.ts +++ b/src/infra/providers/types.ts @@ -20,6 +20,7 @@ export interface AgentSetup { /** Runtime options passed at call time */ export interface ProviderCallOptions { cwd: string; + abortSignal?: AbortSignal; sessionId?: string; model?: string; allowedTools?: string[];