diff --git a/src/__tests__/parallel-output-interleaving.test.ts b/src/__tests__/parallel-output-interleaving.test.ts new file mode 100644 index 0000000..40d3844 --- /dev/null +++ b/src/__tests__/parallel-output-interleaving.test.ts @@ -0,0 +1,76 @@ +/** + * Reproduction tests for stdout/stderr interleaving around streamed text output. + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { ParallelLogger } from '../core/piece/index.js'; +import type { StreamEvent } from '../core/piece/index.js'; + +describe('stream output interleaving', () => { + let output: string[]; + + beforeEach(() => { + vi.useRealTimers(); + output = []; + }); + + it('reproduces sentence fragments when timed flush + worker-pool log interleave', async () => { + vi.useFakeTimers(); + + const logger = new ParallelLogger({ + subMovementNames: ['testing-review'], + writeFn: (text) => output.push(`STDOUT:${text}`), + flushIntervalMs: 10, + minTimedFlushChars: 24, + maxTimedBufferMs: 200, + }); + + const handler = logger.createStreamHandler('testing-review', 0); + + handler({ + type: 'text', + data: { text: '[#429][reviewers][testing-review](2/30)(1) should include' }, + } as StreamEvent); + + await vi.advanceTimersByTimeAsync(20); + + output.push('STDERR:[04:47:20.401] [DEBUG] [worker-pool] poll_tick\n'); + + handler({ + type: 'text', + data: { text: ' ag' }, + } as StreamEvent); + + await vi.advanceTimersByTimeAsync(20); + + output.push('STDERR:[04:47:20.401] [DEBUG] [worker-pool] no_new_tasks\n'); + + handler({ + type: 'text', + data: { text: 'ent Error in throw message when provided' }, + } as StreamEvent); + + logger.flush(); + + expect(output.length).toBeGreaterThanOrEqual(2); + + const rendered = output.join(''); + expect(rendered).toContain('should'); + expect(rendered).toContain('include'); + expect(rendered).toContain('ent Error in throw message when provided'); + expect(rendered).toContain('[worker-pool] poll_tick'); + expect(rendered).toContain('no_new_tasks'); + + const stdoutText = output.filter((line) => line.startsWith('STDOUT:')).join(''); + const stdoutChunks = output + .filter((line) => line.startsWith('STDOUT:')) + .map((line) => line.replace(/^STDOUT:/, '').replace(/^\u001b\[[0-9;]*m\[[^\]]+\]\u001b\[0m /, '')); + + // この再現では、単独断片 " ag" がSTDOUTチャンクとして流れないことを確認する + expect(stdoutText).toContain('ent Error in throw message when provided'); + expect(stdoutChunks).not.toContain(' ag'); + expect(stdoutChunks.join('')).toContain('include agent Error in throw message when provided'); + + vi.useRealTimers(); + }); +}); diff --git a/src/__tests__/stream-buffer.test.ts b/src/__tests__/stream-buffer.test.ts index ca64966..f4459df 100644 --- a/src/__tests__/stream-buffer.test.ts +++ b/src/__tests__/stream-buffer.test.ts @@ -51,6 +51,31 @@ describe('LineTimeSliceBuffer', () => { expect(buffer.flushAll()).toEqual([{ key: 'a', text: 'buffer' }]); }); + it.each([1, 10, 24, 30])( + '先頭の境界文字だけでは切らず、minTimedFlushChars=%s でも同一語を分割しない', + async (minTimedFlushChars) => { + vi.useFakeTimers(); + const flushed: Array<{ key: string; text: string }> = []; + const buffer = new LineTimeSliceBuffer({ + flushIntervalMs: 50, + minTimedFlushChars, + maxTimedBufferMs: 120, + onTimedFlush: (key, text) => flushed.push({ key, text }), + }); + + buffer.push('a', ' ag'); + await vi.advanceTimersByTimeAsync(50); + expect(flushed).toHaveLength(0); + + buffer.push('a', 'ent'); + await vi.advanceTimersByTimeAsync(100); + + expect(flushed).toHaveLength(1); + expect(flushed[0]).toEqual({ key: 'a', text: ' agent' }); + expect(buffer.flushAll()).toEqual([]); + }, + ); + it('境界がない文字列は maxTimedBufferMs 経過後に強制flushする', async () => { vi.useFakeTimers(); const flushed: Array<{ key: string; text: string }> = []; diff --git a/src/core/piece/engine/stream-buffer.ts b/src/core/piece/engine/stream-buffer.ts index d40c36d..1f0d12c 100644 --- a/src/core/piece/engine/stream-buffer.ts +++ b/src/core/piece/engine/stream-buffer.ts @@ -100,9 +100,10 @@ export class LineTimeSliceBuffer { return undefined; } - const boundaryIndex = this.findBoundaryIndex(buffer); - const flushIndex = boundaryIndex > 0 - ? boundaryIndex + const minBoundaryIndex = Math.max(this.minTimedFlushChars - 1, 1); + const boundaryIndex = this.findBoundaryIndex(buffer, minBoundaryIndex); + const flushIndex = boundaryIndex >= 0 + ? boundaryIndex + 1 : buffer.length; const flushText = buffer.slice(0, flushIndex); @@ -118,15 +119,16 @@ export class LineTimeSliceBuffer { return flushText; } - private findBoundaryIndex(text: string): number { + private findBoundaryIndex(text: string, minIndex: number): number { let lastIndex = -1; - for (let i = 0; i < text.length; i += 1) { + const start = Math.min(Math.max(minIndex, 0), text.length - 1); + for (let i = start; i < text.length; i += 1) { const ch = text.charAt(i); if (this.isBoundary(ch)) { lastIndex = i; } } - return lastIndex + 1; + return lastIndex; } private isBoundary(ch: string): boolean {