From c42799739eb933e8f2320d28dd5f00d2d031cf09 Mon Sep 17 00:00:00 2001 From: nrslib <38722970+nrslib@users.noreply.github.com> Date: Wed, 11 Feb 2026 09:48:05 +0900 Subject: [PATCH] =?UTF-8?q?opencode=20=E3=81=8C=E3=83=8F=E3=83=B3=E3=82=B0?= =?UTF-8?q?=E3=81=99=E3=82=8B=E5=95=8F=E9=A1=8C=E3=82=92=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cli-routing-issue-resolve.test.ts | 28 +- src/__tests__/opencode-client-cleanup.test.ts | 248 ++++++++++++++++++ src/app/cli/index.ts | 7 + src/app/cli/routing.ts | 13 +- src/features/tasks/add/index.ts | 8 +- src/infra/opencode/OpenCodeStreamHandler.ts | 22 ++ src/infra/opencode/client.ts | 94 ++++++- src/shared/prompt/confirm.ts | 12 + 8 files changed, 406 insertions(+), 26 deletions(-) create mode 100644 src/__tests__/opencode-client-cleanup.test.ts diff --git a/src/__tests__/cli-routing-issue-resolve.test.ts b/src/__tests__/cli-routing-issue-resolve.test.ts index 74c3de9..522b7e9 100644 --- a/src/__tests__/cli-routing-issue-resolve.test.ts +++ b/src/__tests__/cli-routing-issue-resolve.test.ts @@ -41,7 +41,7 @@ vi.mock('../features/tasks/index.js', () => ({ selectAndExecuteTask: vi.fn(), determinePiece: vi.fn(), saveTaskFromInteractive: vi.fn(), - createIssueAndSaveTask: vi.fn(), + createIssueFromTask: vi.fn(), })); vi.mock('../features/pipeline/index.js', () => ({ @@ -89,7 +89,7 @@ vi.mock('../app/cli/helpers.js', () => ({ })); import { checkGhCli, fetchIssue, formatIssueAsTask, parseIssueNumbers } from '../infra/github/issue.js'; -import { selectAndExecuteTask, determinePiece, createIssueAndSaveTask } from '../features/tasks/index.js'; +import { selectAndExecuteTask, determinePiece, createIssueFromTask, saveTaskFromInteractive } from '../features/tasks/index.js'; import { interactiveMode, selectRecentSession } from '../features/interactive/index.js'; import { loadGlobalConfig } from '../infra/config/index.js'; import { confirm } from '../shared/prompt/index.js'; @@ -103,7 +103,8 @@ const mockFormatIssueAsTask = vi.mocked(formatIssueAsTask); const mockParseIssueNumbers = vi.mocked(parseIssueNumbers); const mockSelectAndExecuteTask = vi.mocked(selectAndExecuteTask); const mockDeterminePiece = vi.mocked(determinePiece); -const mockCreateIssueAndSaveTask = vi.mocked(createIssueAndSaveTask); +const mockCreateIssueFromTask = vi.mocked(createIssueFromTask); +const mockSaveTaskFromInteractive = vi.mocked(saveTaskFromInteractive); const mockInteractiveMode = vi.mocked(interactiveMode); const mockSelectRecentSession = vi.mocked(selectRecentSession); const mockLoadGlobalConfig = vi.mocked(loadGlobalConfig); @@ -280,38 +281,41 @@ describe('Issue resolution in routing', () => { }); describe('create_issue action', () => { - it('should delegate to createIssueAndSaveTask with cwd, task, and pieceId when confirmed', async () => { + it('should create issue first, then delegate final confirmation to saveTaskFromInteractive', async () => { // Given mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' }); - mockConfirm.mockResolvedValue(true); + mockCreateIssueFromTask.mockReturnValue(226); // When await executeDefaultAction(); - // Then: createIssueAndSaveTask should be called with correct args - expect(mockCreateIssueAndSaveTask).toHaveBeenCalledWith( + // Then: issue is created first + expect(mockCreateIssueFromTask).toHaveBeenCalledWith('New feature request'); + // Then: saveTaskFromInteractive receives final confirmation message + expect(mockSaveTaskFromInteractive).toHaveBeenCalledWith( '/test/cwd', 'New feature request', 'default', + { issue: 226, confirmAtEndMessage: 'Add this issue to tasks?' }, ); }); - it('should skip createIssueAndSaveTask when not confirmed', async () => { + it('should skip confirmation and task save when issue creation fails', async () => { // Given mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' }); - mockConfirm.mockResolvedValue(false); + mockCreateIssueFromTask.mockReturnValue(undefined); // When await executeDefaultAction(); - // Then: task should not be added when user declines - expect(mockCreateIssueAndSaveTask).not.toHaveBeenCalled(); + // Then + expect(mockCreateIssueFromTask).toHaveBeenCalledWith('New feature request'); + expect(mockSaveTaskFromInteractive).not.toHaveBeenCalled(); }); it('should not call selectAndExecuteTask when create_issue action is chosen', async () => { // Given mockInteractiveMode.mockResolvedValue({ action: 'create_issue', task: 'New feature request' }); - mockConfirm.mockResolvedValue(true); // When await executeDefaultAction(); diff --git a/src/__tests__/opencode-client-cleanup.test.ts b/src/__tests__/opencode-client-cleanup.test.ts new file mode 100644 index 0000000..58c0e54 --- /dev/null +++ b/src/__tests__/opencode-client-cleanup.test.ts @@ -0,0 +1,248 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +class MockEventStream implements AsyncGenerator { + private index = 0; + private readonly events: unknown[]; + readonly returnSpy = vi.fn(async () => ({ done: true as const, value: undefined })); + + constructor(events: unknown[]) { + this.events = events; + } + + [Symbol.asyncIterator](): AsyncGenerator { + return this; + } + + async next(): Promise> { + if (this.index >= this.events.length) { + return { done: true, value: undefined }; + } + const value = this.events[this.index]; + this.index += 1; + return { done: false, value }; + } + + async return(): Promise> { + return this.returnSpy(); + } + + async throw(e?: unknown): Promise> { + throw e; + } +} + +class HangingAfterEventsStream implements AsyncGenerator { + private index = 0; + private closed = false; + private pendingResolve: ((value: IteratorResult) => void) | undefined; + readonly returnSpy = vi.fn(async () => { + this.closed = true; + this.pendingResolve?.({ done: true, value: undefined }); + return { done: true as const, value: undefined }; + }); + + constructor(private readonly events: unknown[]) {} + + [Symbol.asyncIterator](): AsyncGenerator { + return this; + } + + async next(): Promise> { + if (this.closed) { + return { done: true, value: undefined }; + } + if (this.index < this.events.length) { + const value = this.events[this.index]; + this.index += 1; + return { done: false, value }; + } + return new Promise>((resolve) => { + this.pendingResolve = resolve; + }); + } + + async return(): Promise> { + return this.returnSpy(); + } + + async throw(e?: unknown): Promise> { + throw e; + } +} + +const { createOpencodeMock } = vi.hoisted(() => ({ + createOpencodeMock: vi.fn(), +})); + +vi.mock('node:net', () => ({ + createServer: () => { + const handlers = new Map void>(); + return { + unref: vi.fn(), + on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + handlers.set(event, handler); + }), + listen: vi.fn((_port: number, _host: string, cb: () => void) => { + cb(); + }), + address: vi.fn(() => ({ port: 62000 })), + close: vi.fn((cb?: (err?: Error) => void) => cb?.()), + }; + }, +})); + +vi.mock('@opencode-ai/sdk/v2', () => ({ + createOpencode: createOpencodeMock, +})); + +describe('OpenCodeClient stream cleanup', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('should close SSE stream when session.idle is received', async () => { + const { OpenCodeClient } = await import('../infra/opencode/client.js'); + const stream = new MockEventStream([ + { + type: 'session.idle', + properties: { sessionID: 'session-1' }, + }, + ]); + + const promptAsync = vi.fn().mockResolvedValue(undefined); + const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-1' } }); + const disposeInstance = vi.fn().mockResolvedValue({ data: {} }); + + const subscribe = vi.fn().mockResolvedValue({ stream }); + createOpencodeMock.mockResolvedValue({ + client: { + instance: { dispose: disposeInstance }, + session: { create: sessionCreate, promptAsync }, + event: { subscribe }, + permission: { reply: vi.fn() }, + }, + server: { close: vi.fn() }, + }); + + const client = new OpenCodeClient(); + const result = await client.call('interactive', 'hello', { + cwd: '/tmp', + model: 'opencode/big-pickle', + }); + + expect(result.status).toBe('done'); + expect(stream.returnSpy).toHaveBeenCalled(); + expect(disposeInstance).toHaveBeenCalledWith( + { directory: '/tmp' }, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + expect(subscribe).toHaveBeenCalledWith( + { directory: '/tmp' }, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); + + it('should close SSE stream when session.error is received', async () => { + const { OpenCodeClient } = await import('../infra/opencode/client.js'); + const stream = new MockEventStream([ + { + type: 'session.error', + properties: { + sessionID: 'session-2', + error: { name: 'Error', data: { message: 'boom' } }, + }, + }, + ]); + + const promptAsync = vi.fn().mockResolvedValue(undefined); + const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-2' } }); + const disposeInstance = vi.fn().mockResolvedValue({ data: {} }); + + const subscribe = vi.fn().mockResolvedValue({ stream }); + createOpencodeMock.mockResolvedValue({ + client: { + instance: { dispose: disposeInstance }, + session: { create: sessionCreate, promptAsync }, + event: { subscribe }, + permission: { reply: vi.fn() }, + }, + server: { close: vi.fn() }, + }); + + const client = new OpenCodeClient(); + const result = await client.call('interactive', 'hello', { + cwd: '/tmp', + model: 'opencode/big-pickle', + }); + + expect(result.status).toBe('error'); + expect(result.content).toContain('boom'); + expect(stream.returnSpy).toHaveBeenCalled(); + expect(disposeInstance).toHaveBeenCalledWith( + { directory: '/tmp' }, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + expect(subscribe).toHaveBeenCalledWith( + { directory: '/tmp' }, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); + + it('should complete without hanging when assistant message is completed', async () => { + const { OpenCodeClient } = await import('../infra/opencode/client.js'); + const stream = new HangingAfterEventsStream([ + { + type: 'message.part.updated', + properties: { + part: { id: 'p-1', type: 'text', text: 'done' }, + delta: 'done', + }, + }, + { + type: 'message.updated', + properties: { + info: { + sessionID: 'session-3', + role: 'assistant', + time: { created: Date.now(), completed: Date.now() + 1 }, + }, + }, + }, + ]); + + const promptAsync = vi.fn().mockResolvedValue(undefined); + const sessionCreate = vi.fn().mockResolvedValue({ data: { id: 'session-3' } }); + const disposeInstance = vi.fn().mockResolvedValue({ data: {} }); + + const subscribe = vi.fn().mockResolvedValue({ stream }); + createOpencodeMock.mockResolvedValue({ + client: { + instance: { dispose: disposeInstance }, + session: { create: sessionCreate, promptAsync }, + event: { subscribe }, + permission: { reply: vi.fn() }, + }, + server: { close: vi.fn() }, + }); + + const client = new OpenCodeClient(); + const result = await Promise.race([ + client.call('interactive', 'hello', { + cwd: '/tmp', + model: 'opencode/big-pickle', + }), + new Promise((_, reject) => setTimeout(() => reject(new Error('timed out')), 500)), + ]); + + expect(result.status).toBe('done'); + expect(result.content).toBe('done'); + expect(disposeInstance).toHaveBeenCalledWith( + { directory: '/tmp' }, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + expect(subscribe).toHaveBeenCalledWith( + { directory: '/tmp' }, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); +}); diff --git a/src/app/cli/index.ts b/src/app/cli/index.ts index 05653b5..c93b140 100644 --- a/src/app/cli/index.ts +++ b/src/app/cli/index.ts @@ -35,6 +35,13 @@ import { executeDefaultAction } from './routing.js'; // Normal parsing for all other cases (including '#' prefixed inputs) await program.parseAsync(); + + // Some providers/SDKs may leave active handles even after command completion. + // Keep only watch mode as a long-running command; all others should exit explicitly. + const rootArg = process.argv.slice(2)[0]; + if (rootArg !== 'watch') { + process.exit(0); + } })().catch((err) => { console.error(err); process.exit(1); diff --git a/src/app/cli/routing.ts b/src/app/cli/routing.ts index 7073aeb..aae09e7 100644 --- a/src/app/cli/routing.ts +++ b/src/app/cli/routing.ts @@ -6,11 +6,10 @@ */ import { info, error, withProgress } from '../../shared/ui/index.js'; -import { confirm } from '../../shared/prompt/index.js'; import { getErrorMessage } from '../../shared/utils/index.js'; import { getLabel } from '../../shared/i18n/index.js'; import { fetchIssue, formatIssueAsTask, checkGhCli, parseIssueNumbers, type GitHubIssue } from '../../infra/github/index.js'; -import { selectAndExecuteTask, determinePiece, saveTaskFromInteractive, createIssueAndSaveTask, type SelectAndExecuteOptions } from '../../features/tasks/index.js'; +import { selectAndExecuteTask, determinePiece, saveTaskFromInteractive, createIssueFromTask, type SelectAndExecuteOptions } from '../../features/tasks/index.js'; import { executePipeline } from '../../features/pipeline/index.js'; import { interactiveMode, @@ -205,8 +204,14 @@ export async function executeDefaultAction(task?: string): Promise { break; case 'create_issue': - if (await confirm('Add this issue to tasks?', true)) { - await createIssueAndSaveTask(resolvedCwd, result.task, pieceId); + { + const issueNumber = createIssueFromTask(result.task); + if (issueNumber !== undefined) { + await saveTaskFromInteractive(resolvedCwd, result.task, pieceId, { + issue: issueNumber, + confirmAtEndMessage: 'Add this issue to tasks?', + }); + } } break; diff --git a/src/features/tasks/add/index.ts b/src/features/tasks/add/index.ts index 643c3e5..cd48f40 100644 --- a/src/features/tasks/add/index.ts +++ b/src/features/tasks/add/index.ts @@ -149,9 +149,15 @@ export async function saveTaskFromInteractive( cwd: string, task: string, piece?: string, - options?: { issue?: number }, + options?: { issue?: number; confirmAtEndMessage?: string }, ): Promise { const settings = await promptWorktreeSettings(); + if (options?.confirmAtEndMessage) { + const approved = await confirm(options.confirmAtEndMessage, true); + if (!approved) { + return; + } + } const created = await saveTaskFile(cwd, task, { piece, issue: options?.issue, ...settings }); displayTaskCreationResult(created, settings, piece); } diff --git a/src/infra/opencode/OpenCodeStreamHandler.ts b/src/infra/opencode/OpenCodeStreamHandler.ts index dfd70d6..421bb6f 100644 --- a/src/infra/opencode/OpenCodeStreamHandler.ts +++ b/src/infra/opencode/OpenCodeStreamHandler.ts @@ -47,6 +47,14 @@ export interface OpenCodeSessionIdleEvent { properties: { sessionID: string }; } +export interface OpenCodeSessionStatusEvent { + type: 'session.status'; + properties: { + sessionID: string; + status: { type: 'idle' | 'busy' | 'retry'; attempt?: number; message?: string; next?: number }; + }; +} + export interface OpenCodeSessionErrorEvent { type: 'session.error'; properties: { @@ -55,6 +63,18 @@ export interface OpenCodeSessionErrorEvent { }; } +export interface OpenCodeMessageUpdatedEvent { + type: 'message.updated'; + properties: { + info: { + sessionID: string; + role: 'assistant' | 'user'; + time?: { created?: number; completed?: number }; + error?: unknown; + }; + }; +} + export interface OpenCodePermissionAskedEvent { type: 'permission.asked'; properties: { @@ -69,6 +89,8 @@ export interface OpenCodePermissionAskedEvent { export type OpenCodeStreamEvent = | OpenCodeMessagePartUpdatedEvent + | OpenCodeMessageUpdatedEvent + | OpenCodeSessionStatusEvent | OpenCodeSessionIdleEvent | OpenCodeSessionErrorEvent | OpenCodePermissionAskedEvent diff --git a/src/infra/opencode/client.ts b/src/infra/opencode/client.ts index fa2690e..2161c4d 100644 --- a/src/infra/opencode/client.ts +++ b/src/infra/opencode/client.ts @@ -41,6 +41,23 @@ const OPENCODE_RETRYABLE_ERROR_PATTERNS = [ 'failed to start server on port', ]; +function extractOpenCodeErrorMessage(error: unknown): string | undefined { + if (!error || typeof error !== 'object') { + return undefined; + } + const value = error as { message?: unknown; data?: { message?: unknown }; name?: unknown }; + if (typeof value.message === 'string' && value.message.length > 0) { + return value.message; + } + if (typeof value.data?.message === 'string' && value.data.message.length > 0) { + return value.data.message; + } + if (typeof value.name === 'string' && value.name.length > 0) { + return value.name; + } + return undefined; +} + function getCommonPrefixLength(a: string, b: string): number { const max = Math.min(a.length, b.length); let i = 0; @@ -149,6 +166,7 @@ export class OpenCodeClient { const timeoutMessage = `OpenCode stream timed out after ${Math.floor(OPENCODE_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; let abortCause: 'timeout' | 'external' | undefined; let serverClose: (() => void) | undefined; + let opencodeApiClient: Awaited>['client'] | undefined; const resetIdleTimeout = (): void => { if (idleTimeoutId !== undefined) { @@ -196,6 +214,7 @@ export class OpenCodeClient { signal: streamAbortController.signal, config, }); + opencodeApiClient = client; serverClose = server.close; const sessionResult = options.sessionId @@ -206,16 +225,21 @@ export class OpenCodeClient { if (!sessionId) { throw new Error('Failed to create OpenCode session'); } - - const { stream } = await client.event.subscribe({ directory: options.cwd }); + const { stream } = await client.event.subscribe( + { directory: options.cwd }, + { signal: streamAbortController.signal }, + ); resetIdleTimeout(); - await client.session.promptAsync({ - sessionID: sessionId, - directory: options.cwd, - model: parsedModel, - parts: [{ type: 'text' as const, text: fullPrompt }], - }); + await client.session.promptAsync( + { + sessionID: sessionId, + directory: options.cwd, + model: parsedModel, + parts: [{ type: 'text' as const, text: fullPrompt }], + }, + { signal: streamAbortController.signal }, + ); emitInit(options.onStream, options.model, sessionId); @@ -232,7 +256,6 @@ export class OpenCodeClient { resetIdleTimeout(); const sseEvent = event as OpenCodeStreamEvent; - if (sseEvent.type === 'message.part.updated') { const props = sseEvent.properties as { part: OpenCodePart; delta?: string }; const part = props.part; @@ -279,6 +302,40 @@ export class OpenCodeClient { continue; } + if (sseEvent.type === 'message.updated') { + const messageProps = sseEvent.properties as { + info?: { + sessionID?: string; + role?: 'assistant' | 'user'; + time?: { completed?: number }; + error?: unknown; + }; + }; + const info = messageProps.info; + const isCurrentAssistantMessage = info?.sessionID === sessionId && info.role === 'assistant'; + const isCompleted = typeof info?.time?.completed === 'number'; + if (isCurrentAssistantMessage && isCompleted) { + const streamError = extractOpenCodeErrorMessage(info.error); + if (streamError) { + success = false; + failureMessage = streamError; + } + break; + } + continue; + } + + if (sseEvent.type === 'session.status') { + const statusProps = sseEvent.properties as { + sessionID?: string; + status?: { type?: string }; + }; + if (statusProps.sessionID === sessionId && statusProps.status?.type === 'idle') { + break; + } + continue; + } + if (sseEvent.type === 'session.idle') { const idleProps = sseEvent.properties as { sessionID: string }; if (idleProps.sessionID === sessionId) { @@ -365,9 +422,28 @@ export class OpenCodeClient { if (options.abortSignal) { options.abortSignal.removeEventListener('abort', onExternalAbort); } + if (opencodeApiClient) { + const disposeAbortController = new AbortController(); + const disposeTimeoutId = setTimeout(() => { + disposeAbortController.abort(); + }, 3000); + try { + await opencodeApiClient.instance.dispose( + { directory: options.cwd }, + { signal: disposeAbortController.signal }, + ); + } catch { + // Ignore dispose errors during cleanup. + } finally { + clearTimeout(disposeTimeoutId); + } + } if (serverClose) { serverClose(); } + if (!streamAbortController.signal.aborted) { + streamAbortController.abort(); + } } } diff --git a/src/shared/prompt/confirm.ts b/src/shared/prompt/confirm.ts index bb82334..ac3bc4f 100644 --- a/src/shared/prompt/confirm.ts +++ b/src/shared/prompt/confirm.ts @@ -9,6 +9,16 @@ import * as readline from 'node:readline'; import chalk from 'chalk'; import { resolveTtyPolicy, assertTtyIfForced } from './tty.js'; +function pauseStdinSafely(): void { + try { + if (process.stdin.readable && !process.stdin.destroyed) { + process.stdin.pause(); + } + } catch { + // Ignore stdin state errors during prompt cleanup. + } +} + /** * Prompt user for simple text input * @returns User input or null if cancelled @@ -27,6 +37,7 @@ export async function promptInput(message: string): Promise { return new Promise((resolve) => { rl.question(chalk.green(message + ': '), (answer) => { rl.close(); + pauseStdinSafely(); const trimmed = answer.trim(); if (!trimmed) { @@ -98,6 +109,7 @@ export async function confirm(message: string, defaultYes = true): Promise { rl.question(chalk.green(`${message} ${hint}: `), (answer) => { rl.close(); + pauseStdinSafely(); const trimmed = answer.trim().toLowerCase();