diff --git a/src/__tests__/opencode-client-cleanup.test.ts b/src/__tests__/opencode-client-cleanup.test.ts index af74d2c..b4ebd59 100644 --- a/src/__tests__/opencode-client-cleanup.test.ts +++ b/src/__tests__/opencode-client-cleanup.test.ts @@ -399,6 +399,52 @@ describe('OpenCodeClient stream cleanup', () => { ); }); + it('should pass empty tools object to promptAsync when allowedTools is an explicit empty array', async () => { + const { OpenCodeClient } = await import('../infra/opencode/client.js'); + const stream = new MockEventStream([ + { + type: 'message.updated', + properties: { + info: { + sessionID: 'session-empty-tools', + role: 'assistant', + time: { created: Date.now(), completed: Date.now() + 1 }, + }, + }, + }, + ]); + + const promptAsync = vi.fn().mockResolvedValue(undefined); + const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-empty-tools' } }); + const disposeInstance = vi.fn().mockResolvedValue({ data: {} }); + const subscribe = vi.fn().mockResolvedValue({ stream }); + + createOpencodeMock.mockResolvedValue({ + client: { + instance: { dispose: disposeInstance }, + session: { create: sessionCreate, promptAsync }, + event: { subscribe }, + permission: { reply: vi.fn() }, + }, + server: { close: vi.fn() }, + }); + + const client = new OpenCodeClient(); + const result = await client.call('coder', 'hello', { + cwd: '/tmp', + model: 'opencode/big-pickle', + allowedTools: [], + }); + + expect(result.status).toBe('done'); + expect(promptAsync).toHaveBeenCalledWith( + expect.objectContaining({ + tools: {}, + }), + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); + it('should configure allow permissions for edit mode', async () => { const { OpenCodeClient } = await import('../infra/opencode/client.js'); const stream = new MockEventStream([ diff --git a/src/__tests__/opencode-types.test.ts b/src/__tests__/opencode-types.test.ts index e8ad9b3..f661940 100644 --- a/src/__tests__/opencode-types.test.ts +++ b/src/__tests__/opencode-types.test.ts @@ -54,7 +54,10 @@ describe('mapToOpenCodeTools', () => { it('should return undefined when tools are not provided', () => { expect(mapToOpenCodeTools(undefined)).toBeUndefined(); - expect(mapToOpenCodeTools([])).toBeUndefined(); + }); + + it('should return empty tool map when explicit empty tools are provided', () => { + expect(mapToOpenCodeTools([])).toEqual({}); }); }); diff --git a/src/infra/codex/client.ts b/src/infra/codex/client.ts index e0d43ec..1fb5da4 100644 --- a/src/infra/codex/client.ts +++ b/src/infra/codex/client.ts @@ -6,7 +6,7 @@ import { Codex } from '@openai/codex-sdk'; import type { AgentResponse } from '../../core/models/index.js'; -import { createLogger, getErrorMessage } from '../../shared/utils/index.js'; +import { createLogger, getErrorMessage, createStreamDiagnostics, type StreamDiagnostics } from '../../shared/utils/index.js'; import { mapToCodexSandboxMode, type CodexCallOptions } from './types.js'; import { type CodexEvent, @@ -113,12 +113,14 @@ export class CodexClient { const streamAbortController = new AbortController(); const timeoutMessage = `Codex stream timed out after ${Math.floor(CODEX_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; let abortCause: 'timeout' | 'external' | undefined; + let diagRef: StreamDiagnostics | undefined; const resetIdleTimeout = (): void => { if (idleTimeoutId !== undefined) { clearTimeout(idleTimeoutId); } idleTimeoutId = setTimeout(() => { + diagRef?.onIdleTimeoutFired(); abortCause = 'timeout'; streamAbortController.abort(); }, CODEX_STREAM_IDLE_TIMEOUT_MS); @@ -145,10 +147,14 @@ export class CodexClient { attempt, }); + const diag = createStreamDiagnostics('codex-sdk', { agentType, model: options.model, attempt }); + diagRef = diag; + const { events } = await thread.runStreamed(fullPrompt, { signal: streamAbortController.signal, }); resetIdleTimeout(); + diag.onConnected(); let content = ''; const contentOffsets = new Map(); @@ -158,6 +164,8 @@ export class CodexClient { for await (const event of events as AsyncGenerator) { resetIdleTimeout(); + diag.onFirstEvent(event.type); + diag.onEvent(event.type); if (event.type === 'thread.started') { currentThreadId = typeof event.thread_id === 'string' ? event.thread_id : currentThreadId; @@ -170,12 +178,14 @@ export class CodexClient { if (event.error && typeof event.error === 'object' && 'message' in event.error) { failureMessage = String((event.error as { message?: unknown }).message ?? ''); } + diag.onStreamError('turn.failed', failureMessage); break; } if (event.type === 'error') { success = false; failureMessage = typeof event.message === 'string' ? event.message : 'Unknown error'; + diag.onStreamError('error', failureMessage); break; } @@ -237,6 +247,8 @@ export class CodexClient { } } + diag.onCompleted(success ? 'normal' : 'error', success ? undefined : failureMessage); + if (!success) { const message = failureMessage || 'Codex execution failed'; const retriable = this.isRetriableError(message, streamAbortController.signal.aborted, abortCause); @@ -275,6 +287,11 @@ export class CodexClient { : CODEX_STREAM_ABORTED_MESSAGE : message; + diagRef?.onCompleted( + abortCause === 'timeout' ? 'timeout' : streamAbortController.signal.aborted ? 'abort' : 'error', + errorMessage, + ); + const retriable = this.isRetriableError(errorMessage, streamAbortController.signal.aborted, abortCause); if (retriable && attempt < CODEX_RETRY_MAX_ATTEMPTS) { log.info('Retrying Codex call after transient exception', { agentType, attempt, errorMessage }); diff --git a/src/infra/opencode/client.ts b/src/infra/opencode/client.ts index d853797..18c70b6 100644 --- a/src/infra/opencode/client.ts +++ b/src/infra/opencode/client.ts @@ -8,7 +8,7 @@ import { createOpencode } from '@opencode-ai/sdk/v2'; import { createServer } from 'node:net'; import type { AgentResponse } from '../../core/models/index.js'; -import { createLogger, getErrorMessage } from '../../shared/utils/index.js'; +import { createLogger, getErrorMessage, createStreamDiagnostics, type StreamDiagnostics } from '../../shared/utils/index.js'; import { parseProviderModel } from '../../shared/utils/providerModel.js'; import { buildOpenCodePermissionConfig, @@ -251,6 +251,7 @@ export class OpenCodeClient { const streamAbortController = new AbortController(); const timeoutMessage = `OpenCode stream timed out after ${Math.floor(OPENCODE_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; let abortCause: 'timeout' | 'external' | undefined; + let diagRef: StreamDiagnostics | undefined; let serverClose: (() => void) | undefined; let opencodeApiClient: Awaited>['client'] | undefined; @@ -259,6 +260,7 @@ export class OpenCodeClient { clearTimeout(idleTimeoutId); } idleTimeoutId = setTimeout(() => { + diagRef?.onIdleTimeoutFired(); abortCause = 'timeout'; streamAbortController.abort(); }, OPENCODE_STREAM_IDLE_TIMEOUT_MS); @@ -285,6 +287,9 @@ export class OpenCodeClient { attempt, }); + const diag = createStreamDiagnostics('opencode-sdk', { agentType, model: options.model, attempt }); + diagRef = diag; + const parsedModel = parseProviderModel(options.model, 'OpenCode model'); const fullModel = `${parsedModel.providerID}/${parsedModel.modelID}`; const port = await getFreePort(); @@ -321,6 +326,7 @@ export class OpenCodeClient { { signal: streamAbortController.signal }, ); resetIdleTimeout(); + diag.onConnected(); const tools = mapToOpenCodeTools(options.allowedTools); await client.session.promptAsync( @@ -349,6 +355,8 @@ export class OpenCodeClient { resetIdleTimeout(); const sseEvent = event as OpenCodeStreamEvent; + diag.onFirstEvent(sseEvent.type); + diag.onEvent(sseEvent.type); if (sseEvent.type === 'message.part.updated') { const props = sseEvent.properties as { part: OpenCodePart; delta?: string }; const part = props.part; @@ -458,6 +466,7 @@ export class OpenCodeClient { if (streamError) { success = false; failureMessage = streamError; + diag.onStreamError('message.updated', streamError); break; } } @@ -479,6 +488,7 @@ export class OpenCodeClient { if (streamError) { success = false; failureMessage = streamError; + diag.onStreamError('message.completed', streamError); break; } } @@ -498,6 +508,7 @@ export class OpenCodeClient { if (isCurrentAssistantMessage) { success = false; failureMessage = extractOpenCodeErrorMessage(info?.error) ?? 'OpenCode message failed'; + diag.onStreamError('message.failed', failureMessage); break; } continue; @@ -530,6 +541,7 @@ export class OpenCodeClient { if (!errorProps.sessionID || errorProps.sessionID === sessionId) { success = false; failureMessage = errorProps.error?.data?.message ?? 'OpenCode session error'; + diag.onStreamError('session.error', failureMessage); break; } continue; @@ -537,6 +549,7 @@ export class OpenCodeClient { } content = [...textContentParts.values()].join('\n'); + diag.onCompleted(success ? 'normal' : 'error', success ? undefined : failureMessage); if (!success) { const message = failureMessage || 'OpenCode execution failed'; @@ -575,6 +588,11 @@ export class OpenCodeClient { : OPENCODE_STREAM_ABORTED_MESSAGE : message; + diagRef?.onCompleted( + abortCause === 'timeout' ? 'timeout' : streamAbortController.signal.aborted ? 'abort' : 'error', + errorMessage, + ); + const retriable = this.isRetriableError(errorMessage, streamAbortController.signal.aborted, abortCause); if (retriable && attempt < OPENCODE_RETRY_MAX_ATTEMPTS) { log.info('Retrying OpenCode call after transient exception', { agentType, attempt, errorMessage }); diff --git a/src/infra/opencode/types.ts b/src/infra/opencode/types.ts index 490561d..fb5fa5a 100644 --- a/src/infra/opencode/types.ts +++ b/src/infra/opencode/types.ts @@ -127,9 +127,12 @@ const BUILTIN_TOOL_MAP: Record = { }; export function mapToOpenCodeTools(allowedTools?: string[]): Record | undefined { - if (!allowedTools || allowedTools.length === 0) { + if (!allowedTools) { return undefined; } + if (allowedTools.length === 0) { + return {}; + } const mapped = new Set(); for (const tool of allowedTools) { @@ -142,7 +145,7 @@ export function mapToOpenCodeTools(allowedTools?: string[]): Record = {}; diff --git a/src/shared/utils/index.ts b/src/shared/utils/index.ts index 340d55c..69050cb 100644 --- a/src/shared/utils/index.ts +++ b/src/shared/utils/index.ts @@ -10,6 +10,7 @@ export * from './reportDir.js'; export * from './slackWebhook.js'; export * from './sleep.js'; export * from './slug.js'; +export * from './streamDiagnostics.js'; export * from './taskPaths.js'; export * from './text.js'; export * from './types.js'; diff --git a/src/shared/utils/streamDiagnostics.ts b/src/shared/utils/streamDiagnostics.ts new file mode 100644 index 0000000..5a8b15b --- /dev/null +++ b/src/shared/utils/streamDiagnostics.ts @@ -0,0 +1,84 @@ +/** + * Stream lifecycle diagnostics for provider clients. + * + * Tracks connection, iteration, event counts, and completion + * to fill the observability gap between stream start and timeout/error. + * All output is debug-level only. + */ + +import { createLogger } from './debug.js'; + +export interface StreamDiagnostics { + /** Call when the stream connection resolves (runStreamed / subscribe) */ + onConnected(): void; + /** Call at the top of each for-await iteration (logs only on first call) */ + onFirstEvent(eventType: string): void; + /** Call for each event to track count and last type (no log output) */ + onEvent(eventType: string): void; + /** Call when the idle timeout callback fires (before abort) */ + onIdleTimeoutFired(): void; + /** Call when error events are received (turn.failed, session.error, etc.) */ + onStreamError(eventType: string, message: string): void; + /** Call on stream completion with reason */ + onCompleted(reason: 'normal' | 'timeout' | 'abort' | 'error', detail?: string): void; +} + +export function createStreamDiagnostics( + component: string, + context: Record, +): StreamDiagnostics { + const log = createLogger(component); + const startTime = Date.now(); + let eventCount = 0; + let lastEventType = ''; + let lastEventTime = 0; + let connected = false; + let firstEventLogged = false; + + return { + onConnected() { + connected = true; + log.debug('Stream connected', { ...context, elapsedMs: Date.now() - startTime }); + }, + + onFirstEvent(eventType: string) { + if (firstEventLogged) return; + firstEventLogged = true; + log.debug('Stream first event', { ...context, firstEventType: eventType, elapsedMs: Date.now() - startTime }); + }, + + onEvent(eventType: string) { + eventCount++; + lastEventType = eventType; + lastEventTime = Date.now(); + }, + + onIdleTimeoutFired() { + log.debug('Idle timeout fired', { + ...context, + eventCount, + lastEventType, + msSinceLastEvent: lastEventTime > 0 ? Date.now() - lastEventTime : undefined, + connected, + iterationStarted: firstEventLogged, + }); + }, + + onStreamError(eventType: string, message: string) { + log.debug('Stream error event', { ...context, eventType, message, eventCount }); + }, + + onCompleted(reason: 'normal' | 'timeout' | 'abort' | 'error', detail?: string) { + log.debug('Stream completed', { + ...context, + reason, + detail, + eventCount, + lastEventType, + durationMs: Date.now() - startTime, + connected, + iterationStarted: firstEventLogged, + }); + }, + }; +}