OpenCode: サーバーシングルトン化で並列実行時の競合を解消

- 1つのサーバーを共有し、並列リクエストはキューで処理
- initPromiseで初期化中の競合を防止
- サーバー起動タイムアウトを30秒→60秒に延長
- 並列呼び出し/モデル変更時のテストを追加
This commit is contained in:
nrslib 2026-02-14 09:04:06 +09:00
parent 9cc6ac2ca7
commit eb593e3829
2 changed files with 167 additions and 80 deletions

View File

@ -57,8 +57,10 @@ vi.mock('@opencode-ai/sdk/v2', () => ({
}));
describe('OpenCodeClient stream cleanup', () => {
beforeEach(() => {
beforeEach(async () => {
vi.clearAllMocks();
const { resetSharedServer } = await import('../infra/opencode/client.js');
resetSharedServer();
});
it('should close SSE stream when session.idle is received', async () => {
@ -445,52 +447,6 @@ describe('OpenCodeClient stream cleanup', () => {
);
});
it('should configure allow permissions for edit mode', async () => {
const { OpenCodeClient } = await import('../infra/opencode/client.js');
const stream = new MockEventStream([
{
type: 'message.updated',
properties: {
info: {
sessionID: 'session-perm',
role: 'assistant',
time: { created: Date.now(), completed: Date.now() + 1 },
},
},
},
]);
const promptAsync = vi.fn().mockResolvedValue(undefined);
const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-perm' } });
const disposeInstance = vi.fn().mockResolvedValue({ data: {} });
const subscribe = vi.fn().mockResolvedValue({ stream });
createOpencodeMock.mockResolvedValue({
client: {
instance: { dispose: disposeInstance },
session: { create: sessionCreate, promptAsync },
event: { subscribe },
permission: { reply: vi.fn() },
},
server: { close: vi.fn() },
});
const client = new OpenCodeClient();
await client.call('coder', 'hello', {
cwd: '/tmp',
model: 'opencode/big-pickle',
permissionMode: 'edit',
});
const createCallArgs = createOpencodeMock.mock.calls[0]?.[0] as { config?: Record<string, unknown> };
const permission = createCallArgs.config?.permission as Record<string, string>;
expect(permission.read).toBe('allow');
expect(permission.edit).toBe('allow');
expect(permission.write).toBe('allow');
expect(permission.bash).toBe('allow');
expect(permission.question).toBe('deny');
});
it('should pass permission ruleset to session.create', async () => {
const { OpenCodeClient } = await import('../infra/opencode/client.js');
const stream = new MockEventStream([
@ -578,4 +534,85 @@ describe('OpenCodeClient stream cleanup', () => {
expect(result.status).toBe('error');
expect(result.content).toContain('permission reply timed out');
});
it('should reuse shared server for parallel calls with same config', async () => {
const { OpenCodeClient, resetSharedServer } = await import('../infra/opencode/client.js');
resetSharedServer();
let callCount = 0;
const sessionCreate = vi.fn().mockImplementation(() => {
callCount += 1;
return Promise.resolve({ data: { id: `session-${callCount}` } });
});
const promptAsync = vi.fn().mockResolvedValue(undefined);
const disposeInstance = vi.fn().mockResolvedValue({ data: {} });
const serverClose = vi.fn();
createOpencodeMock.mockResolvedValue({
client: {
instance: { dispose: disposeInstance },
session: { create: sessionCreate, promptAsync },
event: { subscribe: vi.fn().mockImplementation(() => {
const events = [{ type: 'session.idle', properties: { sessionID: `session-${callCount}` } }];
return Promise.resolve({ stream: new MockEventStream(events) });
}) },
permission: { reply: vi.fn() },
},
server: { close: serverClose },
});
const client = new OpenCodeClient();
const [result1, result2, result3] = await Promise.all([
client.call('coder', 'task1', { cwd: '/tmp', model: 'opencode/big-pickle' }),
client.call('coder', 'task2', { cwd: '/tmp', model: 'opencode/big-pickle' }),
client.call('coder', 'task3', { cwd: '/tmp', model: 'opencode/big-pickle' }),
]);
expect(createOpencodeMock).toHaveBeenCalledTimes(1);
expect(sessionCreate).toHaveBeenCalledTimes(3);
expect(result1.status).toBe('done');
expect(result2.status).toBe('done');
expect(result3.status).toBe('done');
expect(serverClose).not.toHaveBeenCalled();
});
it('should create new server when model changes', async () => {
const { OpenCodeClient, resetSharedServer } = await import('../infra/opencode/client.js');
resetSharedServer();
const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-1' } });
const promptAsync = vi.fn().mockResolvedValue(undefined);
const disposeInstance = vi.fn().mockResolvedValue({ data: {} });
const serverClose1 = vi.fn();
const serverClose2 = vi.fn();
createOpencodeMock.mockResolvedValueOnce({
client: {
instance: { dispose: disposeInstance },
session: { create: sessionCreate, promptAsync },
event: { subscribe: vi.fn().mockResolvedValue({ stream: new MockEventStream([{ type: 'session.idle', properties: { sessionID: 'session-1' } }]) }) },
permission: { reply: vi.fn() },
},
server: { close: serverClose1 },
}).mockResolvedValueOnce({
client: {
instance: { dispose: disposeInstance },
session: { create: sessionCreate, promptAsync },
event: { subscribe: vi.fn().mockResolvedValue({ stream: new MockEventStream([{ type: 'session.idle', properties: { sessionID: 'session-2' } }]) }) },
permission: { reply: vi.fn() },
},
server: { close: serverClose2 },
});
const client = new OpenCodeClient();
const result1 = await client.call('coder', 'task1', { cwd: '/tmp', model: 'opencode/model-a' });
const result2 = await client.call('coder', 'task2', { cwd: '/tmp', model: 'opencode/model-b' });
expect(createOpencodeMock).toHaveBeenCalledTimes(2);
expect(serverClose1).toHaveBeenCalled();
expect(result1.status).toBe('done');
expect(result2.status).toBe('done');
});
});

View File

@ -11,7 +11,6 @@ import type { AgentResponse } from '../../core/models/index.js';
import { createLogger, getErrorMessage, createStreamDiagnostics, parseStructuredOutput, type StreamDiagnostics } from '../../shared/utils/index.js';
import { parseProviderModel } from '../../shared/utils/providerModel.js';
import {
buildOpenCodePermissionConfig,
buildOpenCodePermissionRuleset,
mapToOpenCodePermissionReply,
mapToOpenCodeTools,
@ -36,7 +35,7 @@ const OPENCODE_STREAM_ABORTED_MESSAGE = 'OpenCode execution aborted';
const OPENCODE_RETRY_MAX_ATTEMPTS = 3;
const OPENCODE_RETRY_BASE_DELAY_MS = 250;
const OPENCODE_INTERACTION_TIMEOUT_MS = 5000;
const OPENCODE_SERVER_START_TIMEOUT_MS = 30000;
const OPENCODE_SERVER_START_TIMEOUT_MS = 60000;
const OPENCODE_RETRYABLE_ERROR_PATTERNS = [
'stream disconnected before completion',
'transport error',
@ -47,8 +46,75 @@ const OPENCODE_RETRYABLE_ERROR_PATTERNS = [
'eai_again',
'fetch failed',
'failed to start server on port',
'timeout waiting for server',
];
type OpencodeClient = Awaited<ReturnType<typeof createOpencode>>['client'];
interface SharedServer {
client: OpencodeClient;
close: () => void;
model: string;
apiKey?: string;
queue: Array<(client: OpencodeClient) => void>;
}
let sharedServer: SharedServer | null = null;
let initPromise: Promise<void> | null = null;
async function acquireClient(model: string, apiKey?: string, signal?: AbortSignal): Promise<{ client: OpencodeClient; release: () => void }> {
if (initPromise) {
await initPromise;
}
if (sharedServer?.model === model && sharedServer.apiKey === apiKey) {
if (sharedServer.queue.length === 0) {
return { client: sharedServer.client, release: () => releaseClient() };
}
return new Promise((resolve) => {
sharedServer!.queue.push((client) => resolve({ client, release: () => releaseClient() }));
});
}
sharedServer?.close();
let resolveInit: () => void;
initPromise = new Promise((resolve) => { resolveInit = resolve; });
try {
const port = await getFreePort();
const { client, server } = await createOpencode({
port,
signal,
config: {
model,
small_model: model,
...(apiKey ? { provider: { opencode: { options: { apiKey } } } } : {}),
},
timeout: OPENCODE_SERVER_START_TIMEOUT_MS,
});
sharedServer = { client, close: server.close, model, apiKey, queue: [] };
log.debug('OpenCode server started', { model, port });
return { client, release: () => releaseClient() };
} finally {
initPromise = null;
resolveInit!();
}
}
function releaseClient(): void {
if (!sharedServer) return;
const next = sharedServer.queue.shift();
next?.(sharedServer.client);
}
export function resetSharedServer(): void {
sharedServer?.close();
sharedServer = null;
}
async function withTimeout<T>(
operation: (signal: AbortSignal) => Promise<T>,
timeoutMs: number,
@ -271,8 +337,8 @@ export class OpenCodeClient {
const timeoutMessage = `OpenCode stream timed out after ${Math.floor(OPENCODE_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`;
let abortCause: 'timeout' | 'external' | undefined;
let diagRef: StreamDiagnostics | undefined;
let serverClose: (() => void) | undefined;
let opencodeApiClient: Awaited<ReturnType<typeof createOpencode>>['client'] | undefined;
let release: (() => void) | undefined;
let opencodeApiClient: OpencodeClient | undefined;
let sessionId: string | undefined = options.sessionId;
const interactionTimeoutMs = options.interactionTimeoutMs ?? OPENCODE_INTERACTION_TIMEOUT_MS;
@ -313,37 +379,24 @@ export class OpenCodeClient {
const parsedModel = parseProviderModel(options.model, 'OpenCode model');
const fullModel = `${parsedModel.providerID}/${parsedModel.modelID}`;
const port = await getFreePort();
const permission = buildOpenCodePermissionConfig(options.permissionMode, options.networkAccess);
const config = {
model: fullModel,
small_model: fullModel,
permission,
...(options.opencodeApiKey
? { provider: { opencode: { options: { apiKey: options.opencodeApiKey } } } }
: {}),
};
const { client, server } = await createOpencode({
port,
signal: streamAbortController.signal,
config,
timeout: OPENCODE_SERVER_START_TIMEOUT_MS,
});
opencodeApiClient = client;
serverClose = server.close;
const acquired = await acquireClient(fullModel, options.opencodeApiKey, streamAbortController.signal);
opencodeApiClient = acquired.client;
release = acquired.release;
const sessionResult = sessionId
? { data: { id: sessionId } }
: await client.session.create({
: await opencodeApiClient.session.create({
directory: options.cwd,
permission: buildOpenCodePermissionRuleset(options.permissionMode, options.networkAccess),
});
sessionId = sessionResult.data?.id;
if (!sessionId) {
release();
throw new Error('Failed to create OpenCode session');
}
const { stream } = await client.event.subscribe(
const { stream } = await opencodeApiClient.event.subscribe(
{ directory: options.cwd },
{ signal: streamAbortController.signal },
);
@ -365,9 +418,8 @@ export class OpenCodeClient {
};
}
// OpenCode SDK types do not yet expose outputFormat even though runtime accepts it.
const promptPayloadForSdk = promptPayload as unknown as Parameters<typeof client.session.promptAsync>[0];
await client.session.promptAsync(promptPayloadForSdk, {
const promptPayloadForSdk = promptPayload as unknown as Parameters<typeof opencodeApiClient.session.promptAsync>[0];
await opencodeApiClient.session.promptAsync(promptPayloadForSdk, {
signal: streamAbortController.signal,
});
@ -427,7 +479,7 @@ export class OpenCodeClient {
? mapToOpenCodePermissionReply(options.permissionMode)
: 'once';
await withTimeout(
(signal) => client.permission.reply({
(signal) => opencodeApiClient!.permission.reply({
requestID: permProps.id,
directory: options.cwd,
reply,
@ -450,7 +502,7 @@ export class OpenCodeClient {
if (!options.onAskUserQuestion) {
try {
await withTimeout(
(signal) => client.question.reject({
(signal) => opencodeApiClient!.question.reject({
requestID: questionProps.id,
directory: options.cwd,
}, { signal }),
@ -468,7 +520,7 @@ export class OpenCodeClient {
try {
const answers = await options.onAskUserQuestion(toQuestionInput(questionProps));
await withTimeout(
(signal) => client.question.reply({
(signal) => opencodeApiClient!.question.reply({
requestID: questionProps.id,
directory: options.cwd,
answers: toQuestionAnswers(questionProps, answers),
@ -671,9 +723,7 @@ export class OpenCodeClient {
clearTimeout(disposeTimeoutId);
}
}
if (serverClose) {
serverClose();
}
release?.();
if (!streamAbortController.signal.aborted) {
streamAbortController.abort();
}