diff --git a/src/__tests__/opencode-client-cleanup.test.ts b/src/__tests__/opencode-client-cleanup.test.ts index b4ebd59..ab40943 100644 --- a/src/__tests__/opencode-client-cleanup.test.ts +++ b/src/__tests__/opencode-client-cleanup.test.ts @@ -57,8 +57,10 @@ vi.mock('@opencode-ai/sdk/v2', () => ({ })); describe('OpenCodeClient stream cleanup', () => { - beforeEach(() => { + beforeEach(async () => { vi.clearAllMocks(); + const { resetSharedServer } = await import('../infra/opencode/client.js'); + resetSharedServer(); }); it('should close SSE stream when session.idle is received', async () => { @@ -445,52 +447,6 @@ describe('OpenCodeClient stream cleanup', () => { ); }); - it('should configure allow permissions for edit mode', async () => { - const { OpenCodeClient } = await import('../infra/opencode/client.js'); - const stream = new MockEventStream([ - { - type: 'message.updated', - properties: { - info: { - sessionID: 'session-perm', - role: 'assistant', - time: { created: Date.now(), completed: Date.now() + 1 }, - }, - }, - }, - ]); - - const promptAsync = vi.fn().mockResolvedValue(undefined); - const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-perm' } }); - const disposeInstance = vi.fn().mockResolvedValue({ data: {} }); - const subscribe = vi.fn().mockResolvedValue({ stream }); - - createOpencodeMock.mockResolvedValue({ - client: { - instance: { dispose: disposeInstance }, - session: { create: sessionCreate, promptAsync }, - event: { subscribe }, - permission: { reply: vi.fn() }, - }, - server: { close: vi.fn() }, - }); - - const client = new OpenCodeClient(); - await client.call('coder', 'hello', { - cwd: '/tmp', - model: 'opencode/big-pickle', - permissionMode: 'edit', - }); - - const createCallArgs = createOpencodeMock.mock.calls[0]?.[0] as { config?: Record }; - const permission = createCallArgs.config?.permission as Record; - expect(permission.read).toBe('allow'); - expect(permission.edit).toBe('allow'); - expect(permission.write).toBe('allow'); - expect(permission.bash).toBe('allow'); - expect(permission.question).toBe('deny'); - }); - it('should pass permission ruleset to session.create', async () => { const { OpenCodeClient } = await import('../infra/opencode/client.js'); const stream = new MockEventStream([ @@ -578,4 +534,85 @@ describe('OpenCodeClient stream cleanup', () => { expect(result.status).toBe('error'); expect(result.content).toContain('permission reply timed out'); }); + + it('should reuse shared server for parallel calls with same config', async () => { + const { OpenCodeClient, resetSharedServer } = await import('../infra/opencode/client.js'); + resetSharedServer(); + + let callCount = 0; + const sessionCreate = vi.fn().mockImplementation(() => { + callCount += 1; + return Promise.resolve({ data: { id: `session-${callCount}` } }); + }); + const promptAsync = vi.fn().mockResolvedValue(undefined); + const disposeInstance = vi.fn().mockResolvedValue({ data: {} }); + const serverClose = vi.fn(); + + createOpencodeMock.mockResolvedValue({ + client: { + instance: { dispose: disposeInstance }, + session: { create: sessionCreate, promptAsync }, + event: { subscribe: vi.fn().mockImplementation(() => { + const events = [{ type: 'session.idle', properties: { sessionID: `session-${callCount}` } }]; + return Promise.resolve({ stream: new MockEventStream(events) }); + }) }, + permission: { reply: vi.fn() }, + }, + server: { close: serverClose }, + }); + + const client = new OpenCodeClient(); + + const [result1, result2, result3] = await Promise.all([ + client.call('coder', 'task1', { cwd: '/tmp', model: 'opencode/big-pickle' }), + client.call('coder', 'task2', { cwd: '/tmp', model: 'opencode/big-pickle' }), + client.call('coder', 'task3', { cwd: '/tmp', model: 'opencode/big-pickle' }), + ]); + + expect(createOpencodeMock).toHaveBeenCalledTimes(1); + expect(sessionCreate).toHaveBeenCalledTimes(3); + expect(result1.status).toBe('done'); + expect(result2.status).toBe('done'); + expect(result3.status).toBe('done'); + expect(serverClose).not.toHaveBeenCalled(); + }); + + it('should create new server when model changes', async () => { + const { OpenCodeClient, resetSharedServer } = await import('../infra/opencode/client.js'); + resetSharedServer(); + + const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-1' } }); + const promptAsync = vi.fn().mockResolvedValue(undefined); + const disposeInstance = vi.fn().mockResolvedValue({ data: {} }); + const serverClose1 = vi.fn(); + const serverClose2 = vi.fn(); + + createOpencodeMock.mockResolvedValueOnce({ + client: { + instance: { dispose: disposeInstance }, + session: { create: sessionCreate, promptAsync }, + event: { subscribe: vi.fn().mockResolvedValue({ stream: new MockEventStream([{ type: 'session.idle', properties: { sessionID: 'session-1' } }]) }) }, + permission: { reply: vi.fn() }, + }, + server: { close: serverClose1 }, + }).mockResolvedValueOnce({ + client: { + instance: { dispose: disposeInstance }, + session: { create: sessionCreate, promptAsync }, + event: { subscribe: vi.fn().mockResolvedValue({ stream: new MockEventStream([{ type: 'session.idle', properties: { sessionID: 'session-2' } }]) }) }, + permission: { reply: vi.fn() }, + }, + server: { close: serverClose2 }, + }); + + const client = new OpenCodeClient(); + + const result1 = await client.call('coder', 'task1', { cwd: '/tmp', model: 'opencode/model-a' }); + const result2 = await client.call('coder', 'task2', { cwd: '/tmp', model: 'opencode/model-b' }); + + expect(createOpencodeMock).toHaveBeenCalledTimes(2); + expect(serverClose1).toHaveBeenCalled(); + expect(result1.status).toBe('done'); + expect(result2.status).toBe('done'); + }); }); diff --git a/src/infra/opencode/client.ts b/src/infra/opencode/client.ts index 07f3970..a0cf429 100644 --- a/src/infra/opencode/client.ts +++ b/src/infra/opencode/client.ts @@ -11,7 +11,6 @@ import type { AgentResponse } from '../../core/models/index.js'; import { createLogger, getErrorMessage, createStreamDiagnostics, parseStructuredOutput, type StreamDiagnostics } from '../../shared/utils/index.js'; import { parseProviderModel } from '../../shared/utils/providerModel.js'; import { - buildOpenCodePermissionConfig, buildOpenCodePermissionRuleset, mapToOpenCodePermissionReply, mapToOpenCodeTools, @@ -36,7 +35,7 @@ const OPENCODE_STREAM_ABORTED_MESSAGE = 'OpenCode execution aborted'; const OPENCODE_RETRY_MAX_ATTEMPTS = 3; const OPENCODE_RETRY_BASE_DELAY_MS = 250; const OPENCODE_INTERACTION_TIMEOUT_MS = 5000; -const OPENCODE_SERVER_START_TIMEOUT_MS = 30000; +const OPENCODE_SERVER_START_TIMEOUT_MS = 60000; const OPENCODE_RETRYABLE_ERROR_PATTERNS = [ 'stream disconnected before completion', 'transport error', @@ -47,8 +46,75 @@ const OPENCODE_RETRYABLE_ERROR_PATTERNS = [ 'eai_again', 'fetch failed', 'failed to start server on port', + 'timeout waiting for server', ]; +type OpencodeClient = Awaited>['client']; + +interface SharedServer { + client: OpencodeClient; + close: () => void; + model: string; + apiKey?: string; + queue: Array<(client: OpencodeClient) => void>; +} + +let sharedServer: SharedServer | null = null; +let initPromise: Promise | null = null; + +async function acquireClient(model: string, apiKey?: string, signal?: AbortSignal): Promise<{ client: OpencodeClient; release: () => void }> { + if (initPromise) { + await initPromise; + } + + if (sharedServer?.model === model && sharedServer.apiKey === apiKey) { + if (sharedServer.queue.length === 0) { + return { client: sharedServer.client, release: () => releaseClient() }; + } + return new Promise((resolve) => { + sharedServer!.queue.push((client) => resolve({ client, release: () => releaseClient() })); + }); + } + + sharedServer?.close(); + + let resolveInit: () => void; + initPromise = new Promise((resolve) => { resolveInit = resolve; }); + + try { + const port = await getFreePort(); + const { client, server } = await createOpencode({ + port, + signal, + config: { + model, + small_model: model, + ...(apiKey ? { provider: { opencode: { options: { apiKey } } } } : {}), + }, + timeout: OPENCODE_SERVER_START_TIMEOUT_MS, + }); + + sharedServer = { client, close: server.close, model, apiKey, queue: [] }; + log.debug('OpenCode server started', { model, port }); + + return { client, release: () => releaseClient() }; + } finally { + initPromise = null; + resolveInit!(); + } +} + +function releaseClient(): void { + if (!sharedServer) return; + const next = sharedServer.queue.shift(); + next?.(sharedServer.client); +} + +export function resetSharedServer(): void { + sharedServer?.close(); + sharedServer = null; +} + async function withTimeout( operation: (signal: AbortSignal) => Promise, timeoutMs: number, @@ -271,8 +337,8 @@ export class OpenCodeClient { const timeoutMessage = `OpenCode stream timed out after ${Math.floor(OPENCODE_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; let abortCause: 'timeout' | 'external' | undefined; let diagRef: StreamDiagnostics | undefined; - let serverClose: (() => void) | undefined; - let opencodeApiClient: Awaited>['client'] | undefined; + let release: (() => void) | undefined; + let opencodeApiClient: OpencodeClient | undefined; let sessionId: string | undefined = options.sessionId; const interactionTimeoutMs = options.interactionTimeoutMs ?? OPENCODE_INTERACTION_TIMEOUT_MS; @@ -313,37 +379,24 @@ export class OpenCodeClient { const parsedModel = parseProviderModel(options.model, 'OpenCode model'); const fullModel = `${parsedModel.providerID}/${parsedModel.modelID}`; - const port = await getFreePort(); - const permission = buildOpenCodePermissionConfig(options.permissionMode, options.networkAccess); - const config = { - model: fullModel, - small_model: fullModel, - permission, - ...(options.opencodeApiKey - ? { provider: { opencode: { options: { apiKey: options.opencodeApiKey } } } } - : {}), - }; - const { client, server } = await createOpencode({ - port, - signal: streamAbortController.signal, - config, - timeout: OPENCODE_SERVER_START_TIMEOUT_MS, - }); - opencodeApiClient = client; - serverClose = server.close; + + const acquired = await acquireClient(fullModel, options.opencodeApiKey, streamAbortController.signal); + opencodeApiClient = acquired.client; + release = acquired.release; const sessionResult = sessionId ? { data: { id: sessionId } } - : await client.session.create({ + : await opencodeApiClient.session.create({ directory: options.cwd, permission: buildOpenCodePermissionRuleset(options.permissionMode, options.networkAccess), }); sessionId = sessionResult.data?.id; if (!sessionId) { + release(); throw new Error('Failed to create OpenCode session'); } - const { stream } = await client.event.subscribe( + const { stream } = await opencodeApiClient.event.subscribe( { directory: options.cwd }, { signal: streamAbortController.signal }, ); @@ -365,9 +418,8 @@ export class OpenCodeClient { }; } - // OpenCode SDK types do not yet expose outputFormat even though runtime accepts it. - const promptPayloadForSdk = promptPayload as unknown as Parameters[0]; - await client.session.promptAsync(promptPayloadForSdk, { + const promptPayloadForSdk = promptPayload as unknown as Parameters[0]; + await opencodeApiClient.session.promptAsync(promptPayloadForSdk, { signal: streamAbortController.signal, }); @@ -427,7 +479,7 @@ export class OpenCodeClient { ? mapToOpenCodePermissionReply(options.permissionMode) : 'once'; await withTimeout( - (signal) => client.permission.reply({ + (signal) => opencodeApiClient!.permission.reply({ requestID: permProps.id, directory: options.cwd, reply, @@ -450,7 +502,7 @@ export class OpenCodeClient { if (!options.onAskUserQuestion) { try { await withTimeout( - (signal) => client.question.reject({ + (signal) => opencodeApiClient!.question.reject({ requestID: questionProps.id, directory: options.cwd, }, { signal }), @@ -468,7 +520,7 @@ export class OpenCodeClient { try { const answers = await options.onAskUserQuestion(toQuestionInput(questionProps)); await withTimeout( - (signal) => client.question.reply({ + (signal) => opencodeApiClient!.question.reply({ requestID: questionProps.id, directory: options.cwd, answers: toQuestionAnswers(questionProps, answers), @@ -671,9 +723,7 @@ export class OpenCodeClient { clearTimeout(disposeTimeoutId); } } - if (serverClose) { - serverClose(); - } + release?.(); if (!streamAbortController.signal.aborted) { streamAbortController.abort(); }