fix: avoid leading-boundary timed flush fragmentation

This commit is contained in:
nrslib 2026-03-02 14:18:16 +09:00
parent 52968ac873
commit bddb66f85d
3 changed files with 109 additions and 6 deletions

View File

@ -0,0 +1,76 @@
/**
* Reproduction tests for stdout/stderr interleaving around streamed text output.
*/
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { ParallelLogger } from '../core/piece/index.js';
import type { StreamEvent } from '../core/piece/index.js';
describe('stream output interleaving', () => {
let output: string[];
beforeEach(() => {
vi.useRealTimers();
output = [];
});
it('reproduces sentence fragments when timed flush + worker-pool log interleave', async () => {
vi.useFakeTimers();
const logger = new ParallelLogger({
subMovementNames: ['testing-review'],
writeFn: (text) => output.push(`STDOUT:${text}`),
flushIntervalMs: 10,
minTimedFlushChars: 24,
maxTimedBufferMs: 200,
});
const handler = logger.createStreamHandler('testing-review', 0);
handler({
type: 'text',
data: { text: '[#429][reviewers][testing-review](2/30)(1) should include' },
} as StreamEvent);
await vi.advanceTimersByTimeAsync(20);
output.push('STDERR:[04:47:20.401] [DEBUG] [worker-pool] poll_tick\n');
handler({
type: 'text',
data: { text: ' ag' },
} as StreamEvent);
await vi.advanceTimersByTimeAsync(20);
output.push('STDERR:[04:47:20.401] [DEBUG] [worker-pool] no_new_tasks\n');
handler({
type: 'text',
data: { text: 'ent Error in throw message when provided' },
} as StreamEvent);
logger.flush();
expect(output.length).toBeGreaterThanOrEqual(2);
const rendered = output.join('');
expect(rendered).toContain('should');
expect(rendered).toContain('include');
expect(rendered).toContain('ent Error in throw message when provided');
expect(rendered).toContain('[worker-pool] poll_tick');
expect(rendered).toContain('no_new_tasks');
const stdoutText = output.filter((line) => line.startsWith('STDOUT:')).join('');
const stdoutChunks = output
.filter((line) => line.startsWith('STDOUT:'))
.map((line) => line.replace(/^STDOUT:/, '').replace(/^\u001b\[[0-9;]*m\[[^\]]+\]\u001b\[0m /, ''));
// この再現では、単独断片 " ag" がSTDOUTチャンクとして流れないことを確認する
expect(stdoutText).toContain('ent Error in throw message when provided');
expect(stdoutChunks).not.toContain(' ag');
expect(stdoutChunks.join('')).toContain('include agent Error in throw message when provided');
vi.useRealTimers();
});
});

View File

@ -51,6 +51,31 @@ describe('LineTimeSliceBuffer', () => {
expect(buffer.flushAll()).toEqual([{ key: 'a', text: 'buffer' }]);
});
it.each([1, 10, 24, 30])(
'先頭の境界文字だけでは切らず、minTimedFlushChars=%s でも同一語を分割しない',
async (minTimedFlushChars) => {
vi.useFakeTimers();
const flushed: Array<{ key: string; text: string }> = [];
const buffer = new LineTimeSliceBuffer({
flushIntervalMs: 50,
minTimedFlushChars,
maxTimedBufferMs: 120,
onTimedFlush: (key, text) => flushed.push({ key, text }),
});
buffer.push('a', ' ag');
await vi.advanceTimersByTimeAsync(50);
expect(flushed).toHaveLength(0);
buffer.push('a', 'ent');
await vi.advanceTimersByTimeAsync(100);
expect(flushed).toHaveLength(1);
expect(flushed[0]).toEqual({ key: 'a', text: ' agent' });
expect(buffer.flushAll()).toEqual([]);
},
);
it('境界がない文字列は maxTimedBufferMs 経過後に強制flushする', async () => {
vi.useFakeTimers();
const flushed: Array<{ key: string; text: string }> = [];

View File

@ -100,9 +100,10 @@ export class LineTimeSliceBuffer {
return undefined;
}
const boundaryIndex = this.findBoundaryIndex(buffer);
const flushIndex = boundaryIndex > 0
? boundaryIndex
const minBoundaryIndex = Math.max(this.minTimedFlushChars - 1, 1);
const boundaryIndex = this.findBoundaryIndex(buffer, minBoundaryIndex);
const flushIndex = boundaryIndex >= 0
? boundaryIndex + 1
: buffer.length;
const flushText = buffer.slice(0, flushIndex);
@ -118,15 +119,16 @@ export class LineTimeSliceBuffer {
return flushText;
}
private findBoundaryIndex(text: string): number {
private findBoundaryIndex(text: string, minIndex: number): number {
let lastIndex = -1;
for (let i = 0; i < text.length; i += 1) {
const start = Math.min(Math.max(minIndex, 0), text.length - 1);
for (let i = start; i < text.length; i += 1) {
const ch = text.charAt(i);
if (this.isBoundary(ch)) {
lastIndex = i;
}
}
return lastIndex + 1;
return lastIndex;
}
private isBoundary(ch: string): boolean {