opencode がハングする問題を修正

This commit is contained in:
nrslib 2026-02-11 09:48:05 +09:00
parent 1e4182b0eb
commit c42799739e
8 changed files with 406 additions and 26 deletions

View File

@ -41,7 +41,7 @@ vi.mock('../features/tasks/index.js', () => ({
selectAndExecuteTask: vi.fn(), selectAndExecuteTask: vi.fn(),
determinePiece: vi.fn(), determinePiece: vi.fn(),
saveTaskFromInteractive: vi.fn(), saveTaskFromInteractive: vi.fn(),
createIssueAndSaveTask: vi.fn(), createIssueFromTask: vi.fn(),
})); }));
vi.mock('../features/pipeline/index.js', () => ({ vi.mock('../features/pipeline/index.js', () => ({
@ -89,7 +89,7 @@ vi.mock('../app/cli/helpers.js', () => ({
})); }));
import { checkGhCli, fetchIssue, formatIssueAsTask, parseIssueNumbers } from '../infra/github/issue.js'; import { checkGhCli, fetchIssue, formatIssueAsTask, parseIssueNumbers } from '../infra/github/issue.js';
import { selectAndExecuteTask, determinePiece, createIssueAndSaveTask } from '../features/tasks/index.js'; import { selectAndExecuteTask, determinePiece, createIssueFromTask, saveTaskFromInteractive } from '../features/tasks/index.js';
import { interactiveMode, selectRecentSession } from '../features/interactive/index.js'; import { interactiveMode, selectRecentSession } from '../features/interactive/index.js';
import { loadGlobalConfig } from '../infra/config/index.js'; import { loadGlobalConfig } from '../infra/config/index.js';
import { confirm } from '../shared/prompt/index.js'; import { confirm } from '../shared/prompt/index.js';
@ -103,7 +103,8 @@ const mockFormatIssueAsTask = vi.mocked(formatIssueAsTask);
const mockParseIssueNumbers = vi.mocked(parseIssueNumbers); const mockParseIssueNumbers = vi.mocked(parseIssueNumbers);
const mockSelectAndExecuteTask = vi.mocked(selectAndExecuteTask); const mockSelectAndExecuteTask = vi.mocked(selectAndExecuteTask);
const mockDeterminePiece = vi.mocked(determinePiece); const mockDeterminePiece = vi.mocked(determinePiece);
const mockCreateIssueAndSaveTask = vi.mocked(createIssueAndSaveTask); const mockCreateIssueFromTask = vi.mocked(createIssueFromTask);
const mockSaveTaskFromInteractive = vi.mocked(saveTaskFromInteractive);
const mockInteractiveMode = vi.mocked(interactiveMode); const mockInteractiveMode = vi.mocked(interactiveMode);
const mockSelectRecentSession = vi.mocked(selectRecentSession); const mockSelectRecentSession = vi.mocked(selectRecentSession);
const mockLoadGlobalConfig = vi.mocked(loadGlobalConfig); const mockLoadGlobalConfig = vi.mocked(loadGlobalConfig);
@ -280,38 +281,41 @@ describe('Issue resolution in routing', () => {
}); });
describe('create_issue action', () => { describe('create_issue action', () => {
it('should delegate to createIssueAndSaveTask with cwd, task, and pieceId when confirmed', async () => { it('should create issue first, then delegate final confirmation to saveTaskFromInteractive', async () => {
// Given // Given
mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' }); mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' });
mockConfirm.mockResolvedValue(true); mockCreateIssueFromTask.mockReturnValue(226);
// When // When
await executeDefaultAction(); await executeDefaultAction();
// Then: createIssueAndSaveTask should be called with correct args // Then: issue is created first
expect(mockCreateIssueAndSaveTask).toHaveBeenCalledWith( expect(mockCreateIssueFromTask).toHaveBeenCalledWith('New feature request');
// Then: saveTaskFromInteractive receives final confirmation message
expect(mockSaveTaskFromInteractive).toHaveBeenCalledWith(
'/test/cwd', '/test/cwd',
'New feature request', 'New feature request',
'default', 'default',
{ issue: 226, confirmAtEndMessage: 'Add this issue to tasks?' },
); );
}); });
it('should skip createIssueAndSaveTask when not confirmed', async () => { it('should skip confirmation and task save when issue creation fails', async () => {
// Given // Given
mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' }); mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' });
mockConfirm.mockResolvedValue(false); mockCreateIssueFromTask.mockReturnValue(undefined);
// When // When
await executeDefaultAction(); await executeDefaultAction();
// Then: task should not be added when user declines // Then
expect(mockCreateIssueAndSaveTask).not.toHaveBeenCalled(); expect(mockCreateIssueFromTask).toHaveBeenCalledWith('New feature request');
expect(mockSaveTaskFromInteractive).not.toHaveBeenCalled();
}); });
it('should not call selectAndExecuteTask when create_issue action is chosen', async () => { it('should not call selectAndExecuteTask when create_issue action is chosen', async () => {
// Given // Given
mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' }); mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' });
mockConfirm.mockResolvedValue(true);
// When // When
await executeDefaultAction(); await executeDefaultAction();

View File

@ -0,0 +1,248 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
class MockEventStream implements AsyncGenerator<unknown, void, unknown> {
private index = 0;
private readonly events: unknown[];
readonly returnSpy = vi.fn(async () => ({ done: true as const, value: undefined }));
constructor(events: unknown[]) {
this.events = events;
}
[Symbol.asyncIterator](): AsyncGenerator<unknown, void, unknown> {
return this;
}
async next(): Promise<IteratorResult<unknown, void>> {
if (this.index >= this.events.length) {
return { done: true, value: undefined };
}
const value = this.events[this.index];
this.index += 1;
return { done: false, value };
}
async return(): Promise<IteratorResult<unknown, void>> {
return this.returnSpy();
}
async throw(e?: unknown): Promise<IteratorResult<unknown, void>> {
throw e;
}
}
class HangingAfterEventsStream implements AsyncGenerator<unknown, void, unknown> {
private index = 0;
private closed = false;
private pendingResolve: ((value: IteratorResult<unknown, void>) => void) | undefined;
readonly returnSpy = vi.fn(async () => {
this.closed = true;
this.pendingResolve?.({ done: true, value: undefined });
return { done: true as const, value: undefined };
});
constructor(private readonly events: unknown[]) {}
[Symbol.asyncIterator](): AsyncGenerator<unknown, void, unknown> {
return this;
}
async next(): Promise<IteratorResult<unknown, void>> {
if (this.closed) {
return { done: true, value: undefined };
}
if (this.index < this.events.length) {
const value = this.events[this.index];
this.index += 1;
return { done: false, value };
}
return new Promise<IteratorResult<unknown, void>>((resolve) => {
this.pendingResolve = resolve;
});
}
async return(): Promise<IteratorResult<unknown, void>> {
return this.returnSpy();
}
async throw(e?: unknown): Promise<IteratorResult<unknown, void>> {
throw e;
}
}
const { createOpencodeMock } = vi.hoisted(() => ({
createOpencodeMock: vi.fn(),
}));
vi.mock('node:net', () => ({
createServer: () => {
const handlers = new Map<string, (...args: unknown[]) => void>();
return {
unref: vi.fn(),
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
handlers.set(event, handler);
}),
listen: vi.fn((_port: number, _host: string, cb: () => void) => {
cb();
}),
address: vi.fn(() => ({ port: 62000 })),
close: vi.fn((cb?: (err?: Error) => void) => cb?.()),
};
},
}));
vi.mock('@opencode-ai/sdk/v2', () => ({
createOpencode: createOpencodeMock,
}));
describe('OpenCodeClient stream cleanup', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('should close SSE stream when session.idle is received', async () => {
const { OpenCodeClient } = await import('../infra/opencode/client.js');
const stream = new MockEventStream([
{
type: 'session.idle',
properties: { sessionID: 'session-1' },
},
]);
const promptAsync = vi.fn().mockResolvedValue(undefined);
const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-1' } });
const disposeInstance = vi.fn().mockResolvedValue({ data: {} });
const subscribe = vi.fn().mockResolvedValue({ stream });
createOpencodeMock.mockResolvedValue({
client: {
instance: { dispose: disposeInstance },
session: { create: sessionCreate, promptAsync },
event: { subscribe },
permission: { reply: vi.fn() },
},
server: { close: vi.fn() },
});
const client = new OpenCodeClient();
const result = await client.call('interactive', 'hello', {
cwd: '/tmp',
model: 'opencode/big-pickle',
});
expect(result.status).toBe('done');
expect(stream.returnSpy).toHaveBeenCalled();
expect(disposeInstance).toHaveBeenCalledWith(
{ directory: '/tmp' },
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
expect(subscribe).toHaveBeenCalledWith(
{ directory: '/tmp' },
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
});
it('should close SSE stream when session.error is received', async () => {
const { OpenCodeClient } = await import('../infra/opencode/client.js');
const stream = new MockEventStream([
{
type: 'session.error',
properties: {
sessionID: 'session-2',
error: { name: 'Error', data: { message: 'boom' } },
},
},
]);
const promptAsync = vi.fn().mockResolvedValue(undefined);
const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-2' } });
const disposeInstance = vi.fn().mockResolvedValue({ data: {} });
const subscribe = vi.fn().mockResolvedValue({ stream });
createOpencodeMock.mockResolvedValue({
client: {
instance: { dispose: disposeInstance },
session: { create: sessionCreate, promptAsync },
event: { subscribe },
permission: { reply: vi.fn() },
},
server: { close: vi.fn() },
});
const client = new OpenCodeClient();
const result = await client.call('interactive', 'hello', {
cwd: '/tmp',
model: 'opencode/big-pickle',
});
expect(result.status).toBe('error');
expect(result.content).toContain('boom');
expect(stream.returnSpy).toHaveBeenCalled();
expect(disposeInstance).toHaveBeenCalledWith(
{ directory: '/tmp' },
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
expect(subscribe).toHaveBeenCalledWith(
{ directory: '/tmp' },
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
});
it('should complete without hanging when assistant message is completed', async () => {
const { OpenCodeClient } = await import('../infra/opencode/client.js');
const stream = new HangingAfterEventsStream([
{
type: 'message.part.updated',
properties: {
part: { id: 'p-1', type: 'text', text: 'done' },
delta: 'done',
},
},
{
type: 'message.updated',
properties: {
info: {
sessionID: 'session-3',
role: 'assistant',
time: { created: Date.now(), completed: Date.now() + 1 },
},
},
},
]);
const promptAsync = vi.fn().mockResolvedValue(undefined);
const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-3' } });
const disposeInstance = vi.fn().mockResolvedValue({ data: {} });
const subscribe = vi.fn().mockResolvedValue({ stream });
createOpencodeMock.mockResolvedValue({
client: {
instance: { dispose: disposeInstance },
session: { create: sessionCreate, promptAsync },
event: { subscribe },
permission: { reply: vi.fn() },
},
server: { close: vi.fn() },
});
const client = new OpenCodeClient();
const result = await Promise.race([
client.call('interactive', 'hello', {
cwd: '/tmp',
model: 'opencode/big-pickle',
}),
new Promise<never>((_, reject) => setTimeout(() => reject(new Error('timed out')), 500)),
]);
expect(result.status).toBe('done');
expect(result.content).toBe('done');
expect(disposeInstance).toHaveBeenCalledWith(
{ directory: '/tmp' },
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
expect(subscribe).toHaveBeenCalledWith(
{ directory: '/tmp' },
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
});
});

View File

@ -35,6 +35,13 @@ import { executeDefaultAction } from './routing.js';
// Normal parsing for all other cases (including '#' prefixed inputs) // Normal parsing for all other cases (including '#' prefixed inputs)
await program.parseAsync(); await program.parseAsync();
// Some providers/SDKs may leave active handles even after command completion.
// Keep only watch mode as a long-running command; all others should exit explicitly.
const rootArg = process.argv.slice(2)[0];
if (rootArg !== 'watch') {
process.exit(0);
}
})().catch((err) => { })().catch((err) => {
console.error(err); console.error(err);
process.exit(1); process.exit(1);

View File

@ -6,11 +6,10 @@
*/ */
import { info, error, withProgress } from '../../shared/ui/index.js'; import { info, error, withProgress } from '../../shared/ui/index.js';
import { confirm } from '../../shared/prompt/index.js';
import { getErrorMessage } from '../../shared/utils/index.js'; import { getErrorMessage } from '../../shared/utils/index.js';
import { getLabel } from '../../shared/i18n/index.js'; import { getLabel } from '../../shared/i18n/index.js';
import { fetchIssue, formatIssueAsTask, checkGhCli, parseIssueNumbers, type GitHubIssue } from '../../infra/github/index.js'; import { fetchIssue, formatIssueAsTask, checkGhCli, parseIssueNumbers, type GitHubIssue } from '../../infra/github/index.js';
import { selectAndExecuteTask, determinePiece, saveTaskFromInteractive, createIssueAndSaveTask, type SelectAndExecuteOptions } from '../../features/tasks/index.js'; import { selectAndExecuteTask, determinePiece, saveTaskFromInteractive, createIssueFromTask, type SelectAndExecuteOptions } from '../../features/tasks/index.js';
import { executePipeline } from '../../features/pipeline/index.js'; import { executePipeline } from '../../features/pipeline/index.js';
import { import {
interactiveMode, interactiveMode,
@ -205,8 +204,14 @@ export async function executeDefaultAction(task?: string): Promise<void> {
break; break;
case 'create_issue': case 'create_issue':
if (await confirm('Add this issue to tasks?', true)) { {
await createIssueAndSaveTask(resolvedCwd, result.task, pieceId); const issueNumber = createIssueFromTask(result.task);
if (issueNumber !== undefined) {
await saveTaskFromInteractive(resolvedCwd, result.task, pieceId, {
issue: issueNumber,
confirmAtEndMessage: 'Add this issue to tasks?',
});
}
} }
break; break;

View File

@ -149,9 +149,15 @@ export async function saveTaskFromInteractive(
cwd: string, cwd: string,
task: string, task: string,
piece?: string, piece?: string,
options?: { issue?: number }, options?: { issue?: number; confirmAtEndMessage?: string },
): Promise<void> { ): Promise<void> {
const settings = await promptWorktreeSettings(); const settings = await promptWorktreeSettings();
if (options?.confirmAtEndMessage) {
const approved = await confirm(options.confirmAtEndMessage, true);
if (!approved) {
return;
}
}
const created = await saveTaskFile(cwd, task, { piece, issue: options?.issue, ...settings }); const created = await saveTaskFile(cwd, task, { piece, issue: options?.issue, ...settings });
displayTaskCreationResult(created, settings, piece); displayTaskCreationResult(created, settings, piece);
} }

View File

@ -47,6 +47,14 @@ export interface OpenCodeSessionIdleEvent {
properties: { sessionID: string }; properties: { sessionID: string };
} }
export interface OpenCodeSessionStatusEvent {
type: 'session.status';
properties: {
sessionID: string;
status: { type: 'idle' | 'busy' | 'retry'; attempt?: number; message?: string; next?: number };
};
}
export interface OpenCodeSessionErrorEvent { export interface OpenCodeSessionErrorEvent {
type: 'session.error'; type: 'session.error';
properties: { properties: {
@ -55,6 +63,18 @@ export interface OpenCodeSessionErrorEvent {
}; };
} }
export interface OpenCodeMessageUpdatedEvent {
type: 'message.updated';
properties: {
info: {
sessionID: string;
role: 'assistant' | 'user';
time?: { created?: number; completed?: number };
error?: unknown;
};
};
}
export interface OpenCodePermissionAskedEvent { export interface OpenCodePermissionAskedEvent {
type: 'permission.asked'; type: 'permission.asked';
properties: { properties: {
@ -69,6 +89,8 @@ export interface OpenCodePermissionAskedEvent {
export type OpenCodeStreamEvent = export type OpenCodeStreamEvent =
| OpenCodeMessagePartUpdatedEvent | OpenCodeMessagePartUpdatedEvent
| OpenCodeMessageUpdatedEvent
| OpenCodeSessionStatusEvent
| OpenCodeSessionIdleEvent | OpenCodeSessionIdleEvent
| OpenCodeSessionErrorEvent | OpenCodeSessionErrorEvent
| OpenCodePermissionAskedEvent | OpenCodePermissionAskedEvent

View File

@ -41,6 +41,23 @@ const OPENCODE_RETRYABLE_ERROR_PATTERNS = [
'failed to start server on port', 'failed to start server on port',
]; ];
function extractOpenCodeErrorMessage(error: unknown): string | undefined {
if (!error || typeof error !== 'object') {
return undefined;
}
const value = error as { message?: unknown; data?: { message?: unknown }; name?: unknown };
if (typeof value.message === 'string' && value.message.length > 0) {
return value.message;
}
if (typeof value.data?.message === 'string' && value.data.message.length > 0) {
return value.data.message;
}
if (typeof value.name === 'string' && value.name.length > 0) {
return value.name;
}
return undefined;
}
function getCommonPrefixLength(a: string, b: string): number { function getCommonPrefixLength(a: string, b: string): number {
const max = Math.min(a.length, b.length); const max = Math.min(a.length, b.length);
let i = 0; let i = 0;
@ -149,6 +166,7 @@ export class OpenCodeClient {
const timeoutMessage = `OpenCode stream timed out after ${Math.floor(OPENCODE_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; const timeoutMessage = `OpenCode stream timed out after ${Math.floor(OPENCODE_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`;
let abortCause: 'timeout' | 'external' | undefined; let abortCause: 'timeout' | 'external' | undefined;
let serverClose: (() => void) | undefined; let serverClose: (() => void) | undefined;
let opencodeApiClient: Awaited<ReturnType<typeof createOpencode>>['client'] | undefined;
const resetIdleTimeout = (): void => { const resetIdleTimeout = (): void => {
if (idleTimeoutId !== undefined) { if (idleTimeoutId !== undefined) {
@ -196,6 +214,7 @@ export class OpenCodeClient {
signal: streamAbortController.signal, signal: streamAbortController.signal,
config, config,
}); });
opencodeApiClient = client;
serverClose = server.close; serverClose = server.close;
const sessionResult = options.sessionId const sessionResult = options.sessionId
@ -206,16 +225,21 @@ export class OpenCodeClient {
if (!sessionId) { if (!sessionId) {
throw new Error('Failed to create OpenCode session'); throw new Error('Failed to create OpenCode session');
} }
const { stream } = await client.event.subscribe(
const { stream } = await client.event.subscribe({ directory: options.cwd }); { directory: options.cwd },
{ signal: streamAbortController.signal },
);
resetIdleTimeout(); resetIdleTimeout();
await client.session.promptAsync({ await client.session.promptAsync(
{
sessionID: sessionId, sessionID: sessionId,
directory: options.cwd, directory: options.cwd,
model: parsedModel, model: parsedModel,
parts: [{ type: 'text' as const, text: fullPrompt }], parts: [{ type: 'text' as const, text: fullPrompt }],
}); },
{ signal: streamAbortController.signal },
);
emitInit(options.onStream, options.model, sessionId); emitInit(options.onStream, options.model, sessionId);
@ -232,7 +256,6 @@ export class OpenCodeClient {
resetIdleTimeout(); resetIdleTimeout();
const sseEvent = event as OpenCodeStreamEvent; const sseEvent = event as OpenCodeStreamEvent;
if (sseEvent.type === 'message.part.updated') { if (sseEvent.type === 'message.part.updated') {
const props = sseEvent.properties as { part: OpenCodePart; delta?: string }; const props = sseEvent.properties as { part: OpenCodePart; delta?: string };
const part = props.part; const part = props.part;
@ -279,6 +302,40 @@ export class OpenCodeClient {
continue; continue;
} }
if (sseEvent.type === 'message.updated') {
const messageProps = sseEvent.properties as {
info?: {
sessionID?: string;
role?: 'assistant' | 'user';
time?: { completed?: number };
error?: unknown;
};
};
const info = messageProps.info;
const isCurrentAssistantMessage = info?.sessionID === sessionId && info.role === 'assistant';
const isCompleted = typeof info?.time?.completed === 'number';
if (isCurrentAssistantMessage && isCompleted) {
const streamError = extractOpenCodeErrorMessage(info.error);
if (streamError) {
success = false;
failureMessage = streamError;
}
break;
}
continue;
}
if (sseEvent.type === 'session.status') {
const statusProps = sseEvent.properties as {
sessionID?: string;
status?: { type?: string };
};
if (statusProps.sessionID === sessionId && statusProps.status?.type === 'idle') {
break;
}
continue;
}
if (sseEvent.type === 'session.idle') { if (sseEvent.type === 'session.idle') {
const idleProps = sseEvent.properties as { sessionID: string }; const idleProps = sseEvent.properties as { sessionID: string };
if (idleProps.sessionID === sessionId) { if (idleProps.sessionID === sessionId) {
@ -365,9 +422,28 @@ export class OpenCodeClient {
if (options.abortSignal) { if (options.abortSignal) {
options.abortSignal.removeEventListener('abort', onExternalAbort); options.abortSignal.removeEventListener('abort', onExternalAbort);
} }
if (opencodeApiClient) {
const disposeAbortController = new AbortController();
const disposeTimeoutId = setTimeout(() => {
disposeAbortController.abort();
}, 3000);
try {
await opencodeApiClient.instance.dispose(
{ directory: options.cwd },
{ signal: disposeAbortController.signal },
);
} catch {
// Ignore dispose errors during cleanup.
} finally {
clearTimeout(disposeTimeoutId);
}
}
if (serverClose) { if (serverClose) {
serverClose(); serverClose();
} }
if (!streamAbortController.signal.aborted) {
streamAbortController.abort();
}
} }
} }

View File

@ -9,6 +9,16 @@ import * as readline from 'node:readline';
import chalk from 'chalk'; import chalk from 'chalk';
import { resolveTtyPolicy, assertTtyIfForced } from './tty.js'; import { resolveTtyPolicy, assertTtyIfForced } from './tty.js';
function pauseStdinSafely(): void {
try {
if (process.stdin.readable && !process.stdin.destroyed) {
process.stdin.pause();
}
} catch {
// Ignore stdin state errors during prompt cleanup.
}
}
/** /**
* Prompt user for simple text input * Prompt user for simple text input
* @returns User input or null if cancelled * @returns User input or null if cancelled
@ -27,6 +37,7 @@ export async function promptInput(message: string): Promise<string | null> {
return new Promise((resolve) => { return new Promise((resolve) => {
rl.question(chalk.green(message + ': '), (answer) => { rl.question(chalk.green(message + ': '), (answer) => {
rl.close(); rl.close();
pauseStdinSafely();
const trimmed = answer.trim(); const trimmed = answer.trim();
if (!trimmed) { if (!trimmed) {
@ -98,6 +109,7 @@ export async function confirm(message: string, defaultYes = true): Promise<boole
return new Promise((resolve) => { return new Promise((resolve) => {
rl.question(chalk.green(`${message} ${hint}: `), (answer) => { rl.question(chalk.green(`${message} ${hint}: `), (answer) => {
rl.close(); rl.close();
pauseStdinSafely();
const trimmed = answer.trim().toLowerCase(); const trimmed = answer.trim().toLowerCase();