Merge pull request #36 from nrslib/issue/27-ndjson-realtime-log
feat: ログをNDJSON形式に変更しリアルタイム書き出し対応
This commit is contained in:
commit
dba25a539b
@ -8,10 +8,17 @@ import { join } from 'node:path';
|
|||||||
import { tmpdir } from 'node:os';
|
import { tmpdir } from 'node:os';
|
||||||
import {
|
import {
|
||||||
createSessionLog,
|
createSessionLog,
|
||||||
saveSessionLog,
|
|
||||||
updateLatestPointer,
|
updateLatestPointer,
|
||||||
|
initNdjsonLog,
|
||||||
|
appendNdjsonLine,
|
||||||
|
loadNdjsonLog,
|
||||||
|
loadSessionLog,
|
||||||
type LatestLogPointer,
|
type LatestLogPointer,
|
||||||
type SessionLog,
|
type SessionLog,
|
||||||
|
type NdjsonRecord,
|
||||||
|
type NdjsonStepComplete,
|
||||||
|
type NdjsonWorkflowComplete,
|
||||||
|
type NdjsonWorkflowAbort,
|
||||||
} from '../utils/session.js';
|
} from '../utils/session.js';
|
||||||
|
|
||||||
/** Create a temp project directory with .takt/logs structure */
|
/** Create a temp project directory with .takt/logs structure */
|
||||||
@ -21,49 +28,6 @@ function createTempProject(): string {
|
|||||||
return dir;
|
return dir;
|
||||||
}
|
}
|
||||||
|
|
||||||
describe('saveSessionLog (atomic)', () => {
|
|
||||||
let projectDir: string;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
projectDir = createTempProject();
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
rmSync(projectDir, { recursive: true, force: true });
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should create session log file with correct content', () => {
|
|
||||||
const log = createSessionLog('test task', projectDir, 'default');
|
|
||||||
const sessionId = 'test-session-001';
|
|
||||||
|
|
||||||
const filepath = saveSessionLog(log, sessionId, projectDir);
|
|
||||||
|
|
||||||
expect(existsSync(filepath)).toBe(true);
|
|
||||||
const content = JSON.parse(readFileSync(filepath, 'utf-8')) as SessionLog;
|
|
||||||
expect(content.task).toBe('test task');
|
|
||||||
expect(content.workflowName).toBe('default');
|
|
||||||
expect(content.status).toBe('running');
|
|
||||||
expect(content.iterations).toBe(0);
|
|
||||||
expect(content.history).toEqual([]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should overwrite existing log file on subsequent saves', () => {
|
|
||||||
const log = createSessionLog('test task', projectDir, 'default');
|
|
||||||
const sessionId = 'test-session-002';
|
|
||||||
|
|
||||||
saveSessionLog(log, sessionId, projectDir);
|
|
||||||
|
|
||||||
log.iterations = 3;
|
|
||||||
log.status = 'completed';
|
|
||||||
saveSessionLog(log, sessionId, projectDir);
|
|
||||||
|
|
||||||
const filepath = join(projectDir, '.takt', 'logs', `${sessionId}.json`);
|
|
||||||
const content = JSON.parse(readFileSync(filepath, 'utf-8')) as SessionLog;
|
|
||||||
expect(content.iterations).toBe(3);
|
|
||||||
expect(content.status).toBe('completed');
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('updateLatestPointer', () => {
|
describe('updateLatestPointer', () => {
|
||||||
let projectDir: string;
|
let projectDir: string;
|
||||||
|
|
||||||
@ -86,7 +50,7 @@ describe('updateLatestPointer', () => {
|
|||||||
|
|
||||||
const pointer = JSON.parse(readFileSync(latestPath, 'utf-8')) as LatestLogPointer;
|
const pointer = JSON.parse(readFileSync(latestPath, 'utf-8')) as LatestLogPointer;
|
||||||
expect(pointer.sessionId).toBe('abc-123');
|
expect(pointer.sessionId).toBe('abc-123');
|
||||||
expect(pointer.logFile).toBe('abc-123.json');
|
expect(pointer.logFile).toBe('abc-123.jsonl');
|
||||||
expect(pointer.task).toBe('my task');
|
expect(pointer.task).toBe('my task');
|
||||||
expect(pointer.workflowName).toBe('default');
|
expect(pointer.workflowName).toBe('default');
|
||||||
expect(pointer.status).toBe('running');
|
expect(pointer.status).toBe('running');
|
||||||
@ -172,3 +136,314 @@ describe('updateLatestPointer', () => {
|
|||||||
expect(pointer.iterations).toBe(3);
|
expect(pointer.iterations).toBe(3);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('NDJSON log', () => {
|
||||||
|
let projectDir: string;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
projectDir = createTempProject();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
rmSync(projectDir, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('initNdjsonLog', () => {
|
||||||
|
it('should create a .jsonl file with workflow_start record', () => {
|
||||||
|
const filepath = initNdjsonLog('sess-001', 'my task', 'default', projectDir);
|
||||||
|
|
||||||
|
expect(filepath).toContain('sess-001.jsonl');
|
||||||
|
expect(existsSync(filepath)).toBe(true);
|
||||||
|
|
||||||
|
const content = readFileSync(filepath, 'utf-8');
|
||||||
|
const lines = content.trim().split('\n');
|
||||||
|
expect(lines).toHaveLength(1);
|
||||||
|
|
||||||
|
const record = JSON.parse(lines[0]!) as NdjsonRecord;
|
||||||
|
expect(record.type).toBe('workflow_start');
|
||||||
|
if (record.type === 'workflow_start') {
|
||||||
|
expect(record.task).toBe('my task');
|
||||||
|
expect(record.workflowName).toBe('default');
|
||||||
|
expect(record.startTime).toBeDefined();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('appendNdjsonLine', () => {
|
||||||
|
it('should append records as individual lines', () => {
|
||||||
|
const filepath = initNdjsonLog('sess-002', 'task', 'wf', projectDir);
|
||||||
|
|
||||||
|
const stepStart: NdjsonRecord = {
|
||||||
|
type: 'step_start',
|
||||||
|
step: 'plan',
|
||||||
|
agent: 'planner',
|
||||||
|
iteration: 1,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
appendNdjsonLine(filepath, stepStart);
|
||||||
|
|
||||||
|
const streamRecord: NdjsonRecord = {
|
||||||
|
type: 'stream',
|
||||||
|
step: 'plan',
|
||||||
|
event: { type: 'text', data: { text: 'hello' } },
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
appendNdjsonLine(filepath, streamRecord);
|
||||||
|
|
||||||
|
const content = readFileSync(filepath, 'utf-8');
|
||||||
|
const lines = content.trim().split('\n');
|
||||||
|
expect(lines).toHaveLength(3); // workflow_start + step_start + stream
|
||||||
|
|
||||||
|
const parsed0 = JSON.parse(lines[0]!) as NdjsonRecord;
|
||||||
|
expect(parsed0.type).toBe('workflow_start');
|
||||||
|
|
||||||
|
const parsed1 = JSON.parse(lines[1]!) as NdjsonRecord;
|
||||||
|
expect(parsed1.type).toBe('step_start');
|
||||||
|
if (parsed1.type === 'step_start') {
|
||||||
|
expect(parsed1.step).toBe('plan');
|
||||||
|
expect(parsed1.agent).toBe('planner');
|
||||||
|
expect(parsed1.iteration).toBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const parsed2 = JSON.parse(lines[2]!) as NdjsonRecord;
|
||||||
|
expect(parsed2.type).toBe('stream');
|
||||||
|
if (parsed2.type === 'stream') {
|
||||||
|
expect(parsed2.event.type).toBe('text');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('loadNdjsonLog', () => {
|
||||||
|
it('should reconstruct SessionLog from NDJSON file', () => {
|
||||||
|
const filepath = initNdjsonLog('sess-003', 'build app', 'default', projectDir);
|
||||||
|
|
||||||
|
// Add step_start + step_complete
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'step_start',
|
||||||
|
step: 'plan',
|
||||||
|
agent: 'planner',
|
||||||
|
iteration: 1,
|
||||||
|
timestamp: '2025-01-01T00:00:01.000Z',
|
||||||
|
});
|
||||||
|
|
||||||
|
const stepComplete: NdjsonStepComplete = {
|
||||||
|
type: 'step_complete',
|
||||||
|
step: 'plan',
|
||||||
|
agent: 'planner',
|
||||||
|
status: 'done',
|
||||||
|
content: 'Plan completed',
|
||||||
|
instruction: 'Create a plan',
|
||||||
|
matchedRuleIndex: 0,
|
||||||
|
matchedRuleMethod: 'phase3_tag',
|
||||||
|
timestamp: '2025-01-01T00:00:02.000Z',
|
||||||
|
};
|
||||||
|
appendNdjsonLine(filepath, stepComplete);
|
||||||
|
|
||||||
|
const complete: NdjsonWorkflowComplete = {
|
||||||
|
type: 'workflow_complete',
|
||||||
|
iterations: 1,
|
||||||
|
endTime: '2025-01-01T00:00:03.000Z',
|
||||||
|
};
|
||||||
|
appendNdjsonLine(filepath, complete);
|
||||||
|
|
||||||
|
const log = loadNdjsonLog(filepath);
|
||||||
|
expect(log).not.toBeNull();
|
||||||
|
expect(log!.task).toBe('build app');
|
||||||
|
expect(log!.workflowName).toBe('default');
|
||||||
|
expect(log!.status).toBe('completed');
|
||||||
|
expect(log!.iterations).toBe(1);
|
||||||
|
expect(log!.endTime).toBe('2025-01-01T00:00:03.000Z');
|
||||||
|
expect(log!.history).toHaveLength(1);
|
||||||
|
expect(log!.history[0]!.step).toBe('plan');
|
||||||
|
expect(log!.history[0]!.content).toBe('Plan completed');
|
||||||
|
expect(log!.history[0]!.matchedRuleIndex).toBe(0);
|
||||||
|
expect(log!.history[0]!.matchedRuleMethod).toBe('phase3_tag');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle aborted workflow', () => {
|
||||||
|
const filepath = initNdjsonLog('sess-004', 'failing task', 'wf', projectDir);
|
||||||
|
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'step_start',
|
||||||
|
step: 'impl',
|
||||||
|
agent: 'coder',
|
||||||
|
iteration: 1,
|
||||||
|
timestamp: '2025-01-01T00:00:01.000Z',
|
||||||
|
});
|
||||||
|
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'step_complete',
|
||||||
|
step: 'impl',
|
||||||
|
agent: 'coder',
|
||||||
|
status: 'error',
|
||||||
|
content: 'Failed',
|
||||||
|
instruction: 'Do the thing',
|
||||||
|
error: 'compile error',
|
||||||
|
timestamp: '2025-01-01T00:00:02.000Z',
|
||||||
|
} satisfies NdjsonStepComplete);
|
||||||
|
|
||||||
|
const abort: NdjsonWorkflowAbort = {
|
||||||
|
type: 'workflow_abort',
|
||||||
|
iterations: 1,
|
||||||
|
reason: 'Max iterations reached',
|
||||||
|
endTime: '2025-01-01T00:00:03.000Z',
|
||||||
|
};
|
||||||
|
appendNdjsonLine(filepath, abort);
|
||||||
|
|
||||||
|
const log = loadNdjsonLog(filepath);
|
||||||
|
expect(log).not.toBeNull();
|
||||||
|
expect(log!.status).toBe('aborted');
|
||||||
|
expect(log!.history[0]!.error).toBe('compile error');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return null for non-existent file', () => {
|
||||||
|
const result = loadNdjsonLog('/nonexistent/path.jsonl');
|
||||||
|
expect(result).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return null for empty file', () => {
|
||||||
|
const logsDir = join(projectDir, '.takt', 'logs');
|
||||||
|
mkdirSync(logsDir, { recursive: true });
|
||||||
|
const filepath = join(logsDir, 'empty.jsonl');
|
||||||
|
writeFileSync(filepath, '', 'utf-8');
|
||||||
|
|
||||||
|
const result = loadNdjsonLog(filepath);
|
||||||
|
expect(result).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should skip stream and step_start records when reconstructing SessionLog', () => {
|
||||||
|
const filepath = initNdjsonLog('sess-005', 'task', 'wf', projectDir);
|
||||||
|
|
||||||
|
// Add various records
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'step_start',
|
||||||
|
step: 'plan',
|
||||||
|
agent: 'planner',
|
||||||
|
iteration: 1,
|
||||||
|
timestamp: '2025-01-01T00:00:01.000Z',
|
||||||
|
});
|
||||||
|
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'stream',
|
||||||
|
step: 'plan',
|
||||||
|
event: { type: 'text', data: { text: 'working...' } },
|
||||||
|
timestamp: '2025-01-01T00:00:01.500Z',
|
||||||
|
});
|
||||||
|
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'step_complete',
|
||||||
|
step: 'plan',
|
||||||
|
agent: 'planner',
|
||||||
|
status: 'done',
|
||||||
|
content: 'Done',
|
||||||
|
instruction: 'Plan it',
|
||||||
|
timestamp: '2025-01-01T00:00:02.000Z',
|
||||||
|
} satisfies NdjsonStepComplete);
|
||||||
|
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'workflow_complete',
|
||||||
|
iterations: 1,
|
||||||
|
endTime: '2025-01-01T00:00:03.000Z',
|
||||||
|
});
|
||||||
|
|
||||||
|
const log = loadNdjsonLog(filepath);
|
||||||
|
expect(log).not.toBeNull();
|
||||||
|
// Only step_complete adds to history
|
||||||
|
expect(log!.history).toHaveLength(1);
|
||||||
|
expect(log!.iterations).toBe(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('loadSessionLog with .jsonl extension', () => {
|
||||||
|
it('should delegate to loadNdjsonLog for .jsonl files', () => {
|
||||||
|
const filepath = initNdjsonLog('sess-006', 'jsonl task', 'wf', projectDir);
|
||||||
|
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'step_complete',
|
||||||
|
step: 'plan',
|
||||||
|
agent: 'planner',
|
||||||
|
status: 'done',
|
||||||
|
content: 'Plan done',
|
||||||
|
instruction: 'Plan',
|
||||||
|
timestamp: '2025-01-01T00:00:02.000Z',
|
||||||
|
} satisfies NdjsonStepComplete);
|
||||||
|
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'workflow_complete',
|
||||||
|
iterations: 1,
|
||||||
|
endTime: '2025-01-01T00:00:03.000Z',
|
||||||
|
});
|
||||||
|
|
||||||
|
// loadSessionLog should handle .jsonl
|
||||||
|
const log = loadSessionLog(filepath);
|
||||||
|
expect(log).not.toBeNull();
|
||||||
|
expect(log!.task).toBe('jsonl task');
|
||||||
|
expect(log!.status).toBe('completed');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should still load legacy .json files', () => {
|
||||||
|
const logsDir = join(projectDir, '.takt', 'logs');
|
||||||
|
mkdirSync(logsDir, { recursive: true });
|
||||||
|
const legacyPath = join(logsDir, 'legacy-001.json');
|
||||||
|
const legacyLog: SessionLog = {
|
||||||
|
task: 'legacy task',
|
||||||
|
projectDir,
|
||||||
|
workflowName: 'wf',
|
||||||
|
iterations: 0,
|
||||||
|
startTime: new Date().toISOString(),
|
||||||
|
status: 'running',
|
||||||
|
history: [],
|
||||||
|
};
|
||||||
|
writeFileSync(legacyPath, JSON.stringify(legacyLog, null, 2), 'utf-8');
|
||||||
|
|
||||||
|
const log = loadSessionLog(legacyPath);
|
||||||
|
expect(log).not.toBeNull();
|
||||||
|
expect(log!.task).toBe('legacy task');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('appendNdjsonLine real-time characteristics', () => {
|
||||||
|
it('should append without overwriting previous content', () => {
|
||||||
|
const filepath = initNdjsonLog('sess-007', 'task', 'wf', projectDir);
|
||||||
|
|
||||||
|
// Read after init
|
||||||
|
const after1 = readFileSync(filepath, 'utf-8').trim().split('\n');
|
||||||
|
expect(after1).toHaveLength(1);
|
||||||
|
|
||||||
|
// Append more records
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'stream',
|
||||||
|
step: 'plan',
|
||||||
|
event: { type: 'text', data: { text: 'chunk1' } },
|
||||||
|
timestamp: '2025-01-01T00:00:01.000Z',
|
||||||
|
});
|
||||||
|
|
||||||
|
const after2 = readFileSync(filepath, 'utf-8').trim().split('\n');
|
||||||
|
expect(after2).toHaveLength(2);
|
||||||
|
// First line should still be workflow_start
|
||||||
|
expect(JSON.parse(after2[0]!).type).toBe('workflow_start');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should produce valid JSON on each line', () => {
|
||||||
|
const filepath = initNdjsonLog('sess-008', 'task', 'wf', projectDir);
|
||||||
|
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
appendNdjsonLine(filepath, {
|
||||||
|
type: 'stream',
|
||||||
|
step: 'plan',
|
||||||
|
event: { type: 'text', data: { text: `chunk-${i}` } },
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const content = readFileSync(filepath, 'utf-8');
|
||||||
|
const lines = content.trim().split('\n');
|
||||||
|
expect(lines).toHaveLength(6); // 1 init + 5 stream
|
||||||
|
|
||||||
|
// Every line should be valid JSON
|
||||||
|
for (const line of lines) {
|
||||||
|
expect(() => JSON.parse(line)).not.toThrow();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@ -24,10 +24,15 @@ import {
|
|||||||
import {
|
import {
|
||||||
generateSessionId,
|
generateSessionId,
|
||||||
createSessionLog,
|
createSessionLog,
|
||||||
addToSessionLog,
|
|
||||||
finalizeSessionLog,
|
finalizeSessionLog,
|
||||||
saveSessionLog,
|
|
||||||
updateLatestPointer,
|
updateLatestPointer,
|
||||||
|
initNdjsonLog,
|
||||||
|
appendNdjsonLine,
|
||||||
|
type NdjsonStepStart,
|
||||||
|
type NdjsonStepComplete,
|
||||||
|
type NdjsonStream,
|
||||||
|
type NdjsonWorkflowComplete,
|
||||||
|
type NdjsonWorkflowAbort,
|
||||||
} from '../utils/session.js';
|
} from '../utils/session.js';
|
||||||
import { createLogger } from '../utils/debug.js';
|
import { createLogger } from '../utils/debug.js';
|
||||||
import { notifySuccess, notifyError } from '../utils/notification.js';
|
import { notifySuccess, notifyError } from '../utils/notification.js';
|
||||||
@ -91,19 +96,34 @@ export async function executeWorkflow(
|
|||||||
header(`${headerPrefix} ${workflowConfig.name}`);
|
header(`${headerPrefix} ${workflowConfig.name}`);
|
||||||
|
|
||||||
const workflowSessionId = generateSessionId();
|
const workflowSessionId = generateSessionId();
|
||||||
const sessionLog = createSessionLog(task, projectCwd, workflowConfig.name);
|
let sessionLog = createSessionLog(task, projectCwd, workflowConfig.name);
|
||||||
|
|
||||||
// Persist initial log + pointer at workflow start (enables crash recovery)
|
// Initialize NDJSON log file + pointer at workflow start
|
||||||
saveSessionLog(sessionLog, workflowSessionId, projectCwd);
|
const ndjsonLogPath = initNdjsonLog(workflowSessionId, task, workflowConfig.name, projectCwd);
|
||||||
updateLatestPointer(sessionLog, workflowSessionId, projectCwd, { copyToPrevious: true });
|
updateLatestPointer(sessionLog, workflowSessionId, projectCwd, { copyToPrevious: true });
|
||||||
|
|
||||||
|
// Track current step name for stream log records
|
||||||
|
const stepRef: { current: string } = { current: '' };
|
||||||
|
|
||||||
// Track current display for streaming
|
// Track current display for streaming
|
||||||
const displayRef: { current: StreamDisplay | null } = { current: null };
|
const displayRef: { current: StreamDisplay | null } = { current: null };
|
||||||
|
|
||||||
// Create stream handler that delegates to current display
|
// Create stream handler that delegates to UI display + writes NDJSON log
|
||||||
const streamHandler = (
|
const streamHandler = (
|
||||||
event: Parameters<ReturnType<StreamDisplay['createHandler']>>[0]
|
event: Parameters<ReturnType<StreamDisplay['createHandler']>>[0]
|
||||||
): void => {
|
): void => {
|
||||||
|
// Write stream event to NDJSON log (real-time)
|
||||||
|
if (stepRef.current) {
|
||||||
|
const record: NdjsonStream = {
|
||||||
|
type: 'stream',
|
||||||
|
step: stepRef.current,
|
||||||
|
event,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
appendNdjsonLine(ndjsonLogPath, record);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delegate to UI display
|
||||||
if (!displayRef.current) return;
|
if (!displayRef.current) return;
|
||||||
if (event.type === 'result') return;
|
if (event.type === 'result') return;
|
||||||
displayRef.current.createHandler()(event);
|
displayRef.current.createHandler()(event);
|
||||||
@ -183,6 +203,17 @@ export async function executeWorkflow(
|
|||||||
log.debug('Step starting', { step: step.name, agent: step.agentDisplayName, iteration });
|
log.debug('Step starting', { step: step.name, agent: step.agentDisplayName, iteration });
|
||||||
info(`[${iteration}/${workflowConfig.maxIterations}] ${step.name} (${step.agentDisplayName})`);
|
info(`[${iteration}/${workflowConfig.maxIterations}] ${step.name} (${step.agentDisplayName})`);
|
||||||
displayRef.current = new StreamDisplay(step.agentDisplayName);
|
displayRef.current = new StreamDisplay(step.agentDisplayName);
|
||||||
|
stepRef.current = step.name;
|
||||||
|
|
||||||
|
// Write step_start record to NDJSON log
|
||||||
|
const record: NdjsonStepStart = {
|
||||||
|
type: 'step_start',
|
||||||
|
step: step.name,
|
||||||
|
agent: step.agentDisplayName,
|
||||||
|
iteration,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
appendNdjsonLine(ndjsonLogPath, record);
|
||||||
});
|
});
|
||||||
|
|
||||||
engine.on('step:complete', (step, response, instruction) => {
|
engine.on('step:complete', (step, response, instruction) => {
|
||||||
@ -219,10 +250,24 @@ export async function executeWorkflow(
|
|||||||
if (response.sessionId) {
|
if (response.sessionId) {
|
||||||
status('Session', response.sessionId);
|
status('Session', response.sessionId);
|
||||||
}
|
}
|
||||||
addToSessionLog(sessionLog, step.name, response, instruction);
|
|
||||||
|
|
||||||
// Incremental save after each step
|
// Write step_complete record to NDJSON log
|
||||||
saveSessionLog(sessionLog, workflowSessionId, projectCwd);
|
const record: NdjsonStepComplete = {
|
||||||
|
type: 'step_complete',
|
||||||
|
step: step.name,
|
||||||
|
agent: response.agent,
|
||||||
|
status: response.status,
|
||||||
|
content: response.content,
|
||||||
|
instruction,
|
||||||
|
...(response.matchedRuleIndex != null ? { matchedRuleIndex: response.matchedRuleIndex } : {}),
|
||||||
|
...(response.matchedRuleMethod ? { matchedRuleMethod: response.matchedRuleMethod } : {}),
|
||||||
|
...(response.error ? { error: response.error } : {}),
|
||||||
|
timestamp: response.timestamp.toISOString(),
|
||||||
|
};
|
||||||
|
appendNdjsonLine(ndjsonLogPath, record);
|
||||||
|
|
||||||
|
// Update in-memory log for pointer metadata (immutable)
|
||||||
|
sessionLog = { ...sessionLog, iterations: sessionLog.iterations + 1 };
|
||||||
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -234,9 +279,15 @@ export async function executeWorkflow(
|
|||||||
|
|
||||||
engine.on('workflow:complete', (state) => {
|
engine.on('workflow:complete', (state) => {
|
||||||
log.info('Workflow completed successfully', { iterations: state.iteration });
|
log.info('Workflow completed successfully', { iterations: state.iteration });
|
||||||
finalizeSessionLog(sessionLog, 'completed');
|
sessionLog = finalizeSessionLog(sessionLog, 'completed');
|
||||||
// Save log to project root so user can find it easily
|
|
||||||
const logPath = saveSessionLog(sessionLog, workflowSessionId, projectCwd);
|
// Write workflow_complete record to NDJSON log
|
||||||
|
const record: NdjsonWorkflowComplete = {
|
||||||
|
type: 'workflow_complete',
|
||||||
|
iterations: state.iteration,
|
||||||
|
endTime: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
appendNdjsonLine(ndjsonLogPath, record);
|
||||||
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
||||||
|
|
||||||
const elapsed = sessionLog.endTime
|
const elapsed = sessionLog.endTime
|
||||||
@ -245,7 +296,7 @@ export async function executeWorkflow(
|
|||||||
const elapsedDisplay = elapsed ? `, ${elapsed}` : '';
|
const elapsedDisplay = elapsed ? `, ${elapsed}` : '';
|
||||||
|
|
||||||
success(`Workflow completed (${state.iteration} iterations${elapsedDisplay})`);
|
success(`Workflow completed (${state.iteration} iterations${elapsedDisplay})`);
|
||||||
info(`Session log: ${logPath}`);
|
info(`Session log: ${ndjsonLogPath}`);
|
||||||
notifySuccess('TAKT', `ワークフロー完了 (${state.iteration} iterations)`);
|
notifySuccess('TAKT', `ワークフロー完了 (${state.iteration} iterations)`);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -256,9 +307,16 @@ export async function executeWorkflow(
|
|||||||
displayRef.current = null;
|
displayRef.current = null;
|
||||||
}
|
}
|
||||||
abortReason = reason;
|
abortReason = reason;
|
||||||
finalizeSessionLog(sessionLog, 'aborted');
|
sessionLog = finalizeSessionLog(sessionLog, 'aborted');
|
||||||
// Save log to project root so user can find it easily
|
|
||||||
const logPath = saveSessionLog(sessionLog, workflowSessionId, projectCwd);
|
// Write workflow_abort record to NDJSON log
|
||||||
|
const record: NdjsonWorkflowAbort = {
|
||||||
|
type: 'workflow_abort',
|
||||||
|
iterations: state.iteration,
|
||||||
|
reason,
|
||||||
|
endTime: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
appendNdjsonLine(ndjsonLogPath, record);
|
||||||
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
||||||
|
|
||||||
const elapsed = sessionLog.endTime
|
const elapsed = sessionLog.endTime
|
||||||
@ -267,7 +325,7 @@ export async function executeWorkflow(
|
|||||||
const elapsedDisplay = elapsed ? ` (${elapsed})` : '';
|
const elapsedDisplay = elapsed ? ` (${elapsed})` : '';
|
||||||
|
|
||||||
error(`Workflow aborted after ${state.iteration} iterations${elapsedDisplay}: ${reason}`);
|
error(`Workflow aborted after ${state.iteration} iterations${elapsedDisplay}: ${reason}`);
|
||||||
info(`Session log: ${logPath}`);
|
info(`Session log: ${ndjsonLogPath}`);
|
||||||
notifyError('TAKT', `中断: ${reason}`);
|
notifyError('TAKT', `中断: ${reason}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -2,9 +2,9 @@
|
|||||||
* Session management utilities
|
* Session management utilities
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { existsSync, readFileSync, copyFileSync } from 'node:fs';
|
import { existsSync, readFileSync, copyFileSync, appendFileSync } from 'node:fs';
|
||||||
import { join } from 'node:path';
|
import { join } from 'node:path';
|
||||||
import type { AgentResponse, WorkflowState } from '../models/types.js';
|
import type { StreamEvent } from '../claude/types.js';
|
||||||
import { getProjectLogsDir, getGlobalLogsDir, ensureDir, writeFileAtomic } from '../config/paths.js';
|
import { getProjectLogsDir, getGlobalLogsDir, ensureDir, writeFileAtomic } from '../config/paths.js';
|
||||||
|
|
||||||
/** Session log entry */
|
/** Session log entry */
|
||||||
@ -31,6 +31,176 @@ export interface SessionLog {
|
|||||||
}>;
|
}>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- NDJSON log types ---
|
||||||
|
|
||||||
|
/** NDJSON record: workflow started */
|
||||||
|
export interface NdjsonWorkflowStart {
|
||||||
|
type: 'workflow_start';
|
||||||
|
task: string;
|
||||||
|
workflowName: string;
|
||||||
|
startTime: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** NDJSON record: step started */
|
||||||
|
export interface NdjsonStepStart {
|
||||||
|
type: 'step_start';
|
||||||
|
step: string;
|
||||||
|
agent: string;
|
||||||
|
iteration: number;
|
||||||
|
timestamp: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** NDJSON record: streaming chunk received */
|
||||||
|
export interface NdjsonStream {
|
||||||
|
type: 'stream';
|
||||||
|
step: string;
|
||||||
|
event: StreamEvent;
|
||||||
|
timestamp: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** NDJSON record: step completed */
|
||||||
|
export interface NdjsonStepComplete {
|
||||||
|
type: 'step_complete';
|
||||||
|
step: string;
|
||||||
|
agent: string;
|
||||||
|
status: string;
|
||||||
|
content: string;
|
||||||
|
instruction: string;
|
||||||
|
matchedRuleIndex?: number;
|
||||||
|
matchedRuleMethod?: string;
|
||||||
|
error?: string;
|
||||||
|
timestamp: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** NDJSON record: workflow completed successfully */
|
||||||
|
export interface NdjsonWorkflowComplete {
|
||||||
|
type: 'workflow_complete';
|
||||||
|
iterations: number;
|
||||||
|
endTime: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** NDJSON record: workflow aborted */
|
||||||
|
export interface NdjsonWorkflowAbort {
|
||||||
|
type: 'workflow_abort';
|
||||||
|
iterations: number;
|
||||||
|
reason: string;
|
||||||
|
endTime: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Union of all NDJSON record types */
|
||||||
|
export type NdjsonRecord =
|
||||||
|
| NdjsonWorkflowStart
|
||||||
|
| NdjsonStepStart
|
||||||
|
| NdjsonStream
|
||||||
|
| NdjsonStepComplete
|
||||||
|
| NdjsonWorkflowComplete
|
||||||
|
| NdjsonWorkflowAbort;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append a single NDJSON line to a log file.
|
||||||
|
* Uses appendFileSync for atomic open→write→close (no file lock held).
|
||||||
|
*/
|
||||||
|
export function appendNdjsonLine(filepath: string, record: NdjsonRecord): void {
|
||||||
|
appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize an NDJSON log file with the workflow_start record.
|
||||||
|
* Creates the logs directory if needed and returns the file path.
|
||||||
|
*/
|
||||||
|
export function initNdjsonLog(
|
||||||
|
sessionId: string,
|
||||||
|
task: string,
|
||||||
|
workflowName: string,
|
||||||
|
projectDir?: string,
|
||||||
|
): string {
|
||||||
|
const logsDir = projectDir
|
||||||
|
? getProjectLogsDir(projectDir)
|
||||||
|
: getGlobalLogsDir();
|
||||||
|
ensureDir(logsDir);
|
||||||
|
|
||||||
|
const filepath = join(logsDir, `${sessionId}.jsonl`);
|
||||||
|
const record: NdjsonWorkflowStart = {
|
||||||
|
type: 'workflow_start',
|
||||||
|
task,
|
||||||
|
workflowName,
|
||||||
|
startTime: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
appendNdjsonLine(filepath, record);
|
||||||
|
return filepath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load an NDJSON log file and convert it to a SessionLog for backward compatibility.
|
||||||
|
* Parses each line as a JSON record and reconstructs the SessionLog structure.
|
||||||
|
*/
|
||||||
|
export function loadNdjsonLog(filepath: string): SessionLog | null {
|
||||||
|
if (!existsSync(filepath)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const content = readFileSync(filepath, 'utf-8');
|
||||||
|
const lines = content.trim().split('\n').filter((line) => line.length > 0);
|
||||||
|
if (lines.length === 0) return null;
|
||||||
|
|
||||||
|
let sessionLog: SessionLog | null = null;
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
const record = JSON.parse(line) as NdjsonRecord;
|
||||||
|
|
||||||
|
switch (record.type) {
|
||||||
|
case 'workflow_start':
|
||||||
|
sessionLog = {
|
||||||
|
task: record.task,
|
||||||
|
projectDir: '',
|
||||||
|
workflowName: record.workflowName,
|
||||||
|
iterations: 0,
|
||||||
|
startTime: record.startTime,
|
||||||
|
status: 'running',
|
||||||
|
history: [],
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'step_complete':
|
||||||
|
if (sessionLog) {
|
||||||
|
sessionLog.history.push({
|
||||||
|
step: record.step,
|
||||||
|
agent: record.agent,
|
||||||
|
instruction: record.instruction,
|
||||||
|
status: record.status,
|
||||||
|
timestamp: record.timestamp,
|
||||||
|
content: record.content,
|
||||||
|
...(record.error ? { error: record.error } : {}),
|
||||||
|
...(record.matchedRuleIndex != null ? { matchedRuleIndex: record.matchedRuleIndex } : {}),
|
||||||
|
...(record.matchedRuleMethod ? { matchedRuleMethod: record.matchedRuleMethod } : {}),
|
||||||
|
});
|
||||||
|
sessionLog.iterations++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'workflow_complete':
|
||||||
|
if (sessionLog) {
|
||||||
|
sessionLog.status = 'completed';
|
||||||
|
sessionLog.endTime = record.endTime;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'workflow_abort':
|
||||||
|
if (sessionLog) {
|
||||||
|
sessionLog.status = 'aborted';
|
||||||
|
sessionLog.endTime = record.endTime;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
// stream and step_start records are not stored in SessionLog
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sessionLog;
|
||||||
|
}
|
||||||
|
|
||||||
/** Generate a session ID */
|
/** Generate a session ID */
|
||||||
export function generateSessionId(): string {
|
export function generateSessionId(): string {
|
||||||
const timestamp = Date.now().toString(36);
|
const timestamp = Date.now().toString(36);
|
||||||
@ -77,56 +247,25 @@ export function createSessionLog(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Add agent response to session log */
|
/** Create a finalized copy of a session log (immutable — does not modify the original) */
|
||||||
export function addToSessionLog(
|
|
||||||
log: SessionLog,
|
|
||||||
stepName: string,
|
|
||||||
response: AgentResponse,
|
|
||||||
instruction: string
|
|
||||||
): void {
|
|
||||||
log.history.push({
|
|
||||||
step: stepName,
|
|
||||||
agent: response.agent,
|
|
||||||
instruction,
|
|
||||||
status: response.status,
|
|
||||||
timestamp: response.timestamp.toISOString(),
|
|
||||||
content: response.content,
|
|
||||||
...(response.error ? { error: response.error } : {}),
|
|
||||||
...(response.matchedRuleIndex != null ? { matchedRuleIndex: response.matchedRuleIndex } : {}),
|
|
||||||
...(response.matchedRuleMethod ? { matchedRuleMethod: response.matchedRuleMethod } : {}),
|
|
||||||
});
|
|
||||||
log.iterations++;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Finalize session log */
|
|
||||||
export function finalizeSessionLog(
|
export function finalizeSessionLog(
|
||||||
log: SessionLog,
|
log: SessionLog,
|
||||||
status: 'completed' | 'aborted'
|
status: 'completed' | 'aborted'
|
||||||
): void {
|
): SessionLog {
|
||||||
log.status = status;
|
return {
|
||||||
log.endTime = new Date().toISOString();
|
...log,
|
||||||
|
status,
|
||||||
|
endTime: new Date().toISOString(),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Save session log to file */
|
/** Load session log from file (supports both .json and .jsonl formats) */
|
||||||
export function saveSessionLog(
|
|
||||||
log: SessionLog,
|
|
||||||
sessionId: string,
|
|
||||||
projectDir?: string
|
|
||||||
): string {
|
|
||||||
const logsDir = projectDir
|
|
||||||
? getProjectLogsDir(projectDir)
|
|
||||||
: getGlobalLogsDir();
|
|
||||||
ensureDir(logsDir);
|
|
||||||
|
|
||||||
const filename = `${sessionId}.json`;
|
|
||||||
const filepath = join(logsDir, filename);
|
|
||||||
|
|
||||||
writeFileAtomic(filepath, JSON.stringify(log, null, 2));
|
|
||||||
return filepath;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Load session log from file */
|
|
||||||
export function loadSessionLog(filepath: string): SessionLog | null {
|
export function loadSessionLog(filepath: string): SessionLog | null {
|
||||||
|
// Try NDJSON format for .jsonl files
|
||||||
|
if (filepath.endsWith('.jsonl')) {
|
||||||
|
return loadNdjsonLog(filepath);
|
||||||
|
}
|
||||||
|
|
||||||
if (!existsSync(filepath)) {
|
if (!existsSync(filepath)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -191,7 +330,7 @@ export function updateLatestPointer(
|
|||||||
|
|
||||||
const pointer: LatestLogPointer = {
|
const pointer: LatestLogPointer = {
|
||||||
sessionId,
|
sessionId,
|
||||||
logFile: `${sessionId}.json`,
|
logFile: `${sessionId}.jsonl`,
|
||||||
task: log.task,
|
task: log.task,
|
||||||
workflowName: log.workflowName,
|
workflowName: log.workflowName,
|
||||||
status: log.status,
|
status: log.status,
|
||||||
@ -203,32 +342,3 @@ export function updateLatestPointer(
|
|||||||
writeFileAtomic(latestPath, JSON.stringify(pointer, null, 2));
|
writeFileAtomic(latestPath, JSON.stringify(pointer, null, 2));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert workflow state to session log */
|
|
||||||
export function workflowStateToSessionLog(
|
|
||||||
state: WorkflowState,
|
|
||||||
task: string,
|
|
||||||
projectDir: string
|
|
||||||
): SessionLog {
|
|
||||||
const log: SessionLog = {
|
|
||||||
task,
|
|
||||||
projectDir,
|
|
||||||
workflowName: state.workflowName,
|
|
||||||
iterations: state.iteration,
|
|
||||||
startTime: new Date().toISOString(),
|
|
||||||
status: state.status === 'running' ? 'running' : state.status === 'completed' ? 'completed' : 'aborted',
|
|
||||||
history: [],
|
|
||||||
};
|
|
||||||
|
|
||||||
for (const [stepName, response] of state.stepOutputs) {
|
|
||||||
log.history.push({
|
|
||||||
step: stepName,
|
|
||||||
agent: response.agent,
|
|
||||||
instruction: '',
|
|
||||||
status: response.status,
|
|
||||||
timestamp: response.timestamp.toISOString(),
|
|
||||||
content: response.content,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return log;
|
|
||||||
}
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user