feat: ログをNDJSON形式に変更し、ストリーミング出力をリアルタイムで書き出す (#27)
- ログフォーマットを単一JSONからNDJSON(.jsonl)に変更 - ストリーミングチャンク・ステップ開始/完了・ワークフロー完了/中断を逐次追記 - appendFileSyncでopen→write→closeを都度実行(tail -fで追跡可能) - レガシー.jsonファイルの読み込みも引き続きサポート - saveSessionLog/addToSessionLogを廃止、initNdjsonLog/appendNdjsonLineに置換
This commit is contained in:
parent
3430be91f2
commit
1fbc90823e
@ -8,10 +8,17 @@ import { join } from 'node:path';
|
||||
import { tmpdir } from 'node:os';
|
||||
import {
|
||||
createSessionLog,
|
||||
saveSessionLog,
|
||||
updateLatestPointer,
|
||||
initNdjsonLog,
|
||||
appendNdjsonLine,
|
||||
loadNdjsonLog,
|
||||
loadSessionLog,
|
||||
type LatestLogPointer,
|
||||
type SessionLog,
|
||||
type NdjsonRecord,
|
||||
type NdjsonStepComplete,
|
||||
type NdjsonWorkflowComplete,
|
||||
type NdjsonWorkflowAbort,
|
||||
} from '../utils/session.js';
|
||||
|
||||
/** Create a temp project directory with .takt/logs structure */
|
||||
@ -21,49 +28,6 @@ function createTempProject(): string {
|
||||
return dir;
|
||||
}
|
||||
|
||||
describe('saveSessionLog (atomic)', () => {
|
||||
let projectDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
projectDir = createTempProject();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
rmSync(projectDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it('should create session log file with correct content', () => {
|
||||
const log = createSessionLog('test task', projectDir, 'default');
|
||||
const sessionId = 'test-session-001';
|
||||
|
||||
const filepath = saveSessionLog(log, sessionId, projectDir);
|
||||
|
||||
expect(existsSync(filepath)).toBe(true);
|
||||
const content = JSON.parse(readFileSync(filepath, 'utf-8')) as SessionLog;
|
||||
expect(content.task).toBe('test task');
|
||||
expect(content.workflowName).toBe('default');
|
||||
expect(content.status).toBe('running');
|
||||
expect(content.iterations).toBe(0);
|
||||
expect(content.history).toEqual([]);
|
||||
});
|
||||
|
||||
it('should overwrite existing log file on subsequent saves', () => {
|
||||
const log = createSessionLog('test task', projectDir, 'default');
|
||||
const sessionId = 'test-session-002';
|
||||
|
||||
saveSessionLog(log, sessionId, projectDir);
|
||||
|
||||
log.iterations = 3;
|
||||
log.status = 'completed';
|
||||
saveSessionLog(log, sessionId, projectDir);
|
||||
|
||||
const filepath = join(projectDir, '.takt', 'logs', `${sessionId}.json`);
|
||||
const content = JSON.parse(readFileSync(filepath, 'utf-8')) as SessionLog;
|
||||
expect(content.iterations).toBe(3);
|
||||
expect(content.status).toBe('completed');
|
||||
});
|
||||
});
|
||||
|
||||
describe('updateLatestPointer', () => {
|
||||
let projectDir: string;
|
||||
|
||||
@ -86,7 +50,7 @@ describe('updateLatestPointer', () => {
|
||||
|
||||
const pointer = JSON.parse(readFileSync(latestPath, 'utf-8')) as LatestLogPointer;
|
||||
expect(pointer.sessionId).toBe('abc-123');
|
||||
expect(pointer.logFile).toBe('abc-123.json');
|
||||
expect(pointer.logFile).toBe('abc-123.jsonl');
|
||||
expect(pointer.task).toBe('my task');
|
||||
expect(pointer.workflowName).toBe('default');
|
||||
expect(pointer.status).toBe('running');
|
||||
@ -172,3 +136,314 @@ describe('updateLatestPointer', () => {
|
||||
expect(pointer.iterations).toBe(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe('NDJSON log', () => {
|
||||
let projectDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
projectDir = createTempProject();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
rmSync(projectDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
describe('initNdjsonLog', () => {
|
||||
it('should create a .jsonl file with workflow_start record', () => {
|
||||
const filepath = initNdjsonLog('sess-001', 'my task', 'default', projectDir);
|
||||
|
||||
expect(filepath).toContain('sess-001.jsonl');
|
||||
expect(existsSync(filepath)).toBe(true);
|
||||
|
||||
const content = readFileSync(filepath, 'utf-8');
|
||||
const lines = content.trim().split('\n');
|
||||
expect(lines).toHaveLength(1);
|
||||
|
||||
const record = JSON.parse(lines[0]!) as NdjsonRecord;
|
||||
expect(record.type).toBe('workflow_start');
|
||||
if (record.type === 'workflow_start') {
|
||||
expect(record.task).toBe('my task');
|
||||
expect(record.workflowName).toBe('default');
|
||||
expect(record.startTime).toBeDefined();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('appendNdjsonLine', () => {
|
||||
it('should append records as individual lines', () => {
|
||||
const filepath = initNdjsonLog('sess-002', 'task', 'wf', projectDir);
|
||||
|
||||
const stepStart: NdjsonRecord = {
|
||||
type: 'step_start',
|
||||
step: 'plan',
|
||||
agent: 'planner',
|
||||
iteration: 1,
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
appendNdjsonLine(filepath, stepStart);
|
||||
|
||||
const streamRecord: NdjsonRecord = {
|
||||
type: 'stream',
|
||||
step: 'plan',
|
||||
event: { type: 'text', data: { text: 'hello' } },
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
appendNdjsonLine(filepath, streamRecord);
|
||||
|
||||
const content = readFileSync(filepath, 'utf-8');
|
||||
const lines = content.trim().split('\n');
|
||||
expect(lines).toHaveLength(3); // workflow_start + step_start + stream
|
||||
|
||||
const parsed0 = JSON.parse(lines[0]!) as NdjsonRecord;
|
||||
expect(parsed0.type).toBe('workflow_start');
|
||||
|
||||
const parsed1 = JSON.parse(lines[1]!) as NdjsonRecord;
|
||||
expect(parsed1.type).toBe('step_start');
|
||||
if (parsed1.type === 'step_start') {
|
||||
expect(parsed1.step).toBe('plan');
|
||||
expect(parsed1.agent).toBe('planner');
|
||||
expect(parsed1.iteration).toBe(1);
|
||||
}
|
||||
|
||||
const parsed2 = JSON.parse(lines[2]!) as NdjsonRecord;
|
||||
expect(parsed2.type).toBe('stream');
|
||||
if (parsed2.type === 'stream') {
|
||||
expect(parsed2.event.type).toBe('text');
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('loadNdjsonLog', () => {
|
||||
it('should reconstruct SessionLog from NDJSON file', () => {
|
||||
const filepath = initNdjsonLog('sess-003', 'build app', 'default', projectDir);
|
||||
|
||||
// Add step_start + step_complete
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'step_start',
|
||||
step: 'plan',
|
||||
agent: 'planner',
|
||||
iteration: 1,
|
||||
timestamp: '2025-01-01T00:00:01.000Z',
|
||||
});
|
||||
|
||||
const stepComplete: NdjsonStepComplete = {
|
||||
type: 'step_complete',
|
||||
step: 'plan',
|
||||
agent: 'planner',
|
||||
status: 'done',
|
||||
content: 'Plan completed',
|
||||
instruction: 'Create a plan',
|
||||
matchedRuleIndex: 0,
|
||||
matchedRuleMethod: 'phase3_tag',
|
||||
timestamp: '2025-01-01T00:00:02.000Z',
|
||||
};
|
||||
appendNdjsonLine(filepath, stepComplete);
|
||||
|
||||
const complete: NdjsonWorkflowComplete = {
|
||||
type: 'workflow_complete',
|
||||
iterations: 1,
|
||||
endTime: '2025-01-01T00:00:03.000Z',
|
||||
};
|
||||
appendNdjsonLine(filepath, complete);
|
||||
|
||||
const log = loadNdjsonLog(filepath);
|
||||
expect(log).not.toBeNull();
|
||||
expect(log!.task).toBe('build app');
|
||||
expect(log!.workflowName).toBe('default');
|
||||
expect(log!.status).toBe('completed');
|
||||
expect(log!.iterations).toBe(1);
|
||||
expect(log!.endTime).toBe('2025-01-01T00:00:03.000Z');
|
||||
expect(log!.history).toHaveLength(1);
|
||||
expect(log!.history[0]!.step).toBe('plan');
|
||||
expect(log!.history[0]!.content).toBe('Plan completed');
|
||||
expect(log!.history[0]!.matchedRuleIndex).toBe(0);
|
||||
expect(log!.history[0]!.matchedRuleMethod).toBe('phase3_tag');
|
||||
});
|
||||
|
||||
it('should handle aborted workflow', () => {
|
||||
const filepath = initNdjsonLog('sess-004', 'failing task', 'wf', projectDir);
|
||||
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'step_start',
|
||||
step: 'impl',
|
||||
agent: 'coder',
|
||||
iteration: 1,
|
||||
timestamp: '2025-01-01T00:00:01.000Z',
|
||||
});
|
||||
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'step_complete',
|
||||
step: 'impl',
|
||||
agent: 'coder',
|
||||
status: 'error',
|
||||
content: 'Failed',
|
||||
instruction: 'Do the thing',
|
||||
error: 'compile error',
|
||||
timestamp: '2025-01-01T00:00:02.000Z',
|
||||
} satisfies NdjsonStepComplete);
|
||||
|
||||
const abort: NdjsonWorkflowAbort = {
|
||||
type: 'workflow_abort',
|
||||
iterations: 1,
|
||||
reason: 'Max iterations reached',
|
||||
endTime: '2025-01-01T00:00:03.000Z',
|
||||
};
|
||||
appendNdjsonLine(filepath, abort);
|
||||
|
||||
const log = loadNdjsonLog(filepath);
|
||||
expect(log).not.toBeNull();
|
||||
expect(log!.status).toBe('aborted');
|
||||
expect(log!.history[0]!.error).toBe('compile error');
|
||||
});
|
||||
|
||||
it('should return null for non-existent file', () => {
|
||||
const result = loadNdjsonLog('/nonexistent/path.jsonl');
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it('should return null for empty file', () => {
|
||||
const logsDir = join(projectDir, '.takt', 'logs');
|
||||
mkdirSync(logsDir, { recursive: true });
|
||||
const filepath = join(logsDir, 'empty.jsonl');
|
||||
writeFileSync(filepath, '', 'utf-8');
|
||||
|
||||
const result = loadNdjsonLog(filepath);
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it('should skip stream and step_start records when reconstructing SessionLog', () => {
|
||||
const filepath = initNdjsonLog('sess-005', 'task', 'wf', projectDir);
|
||||
|
||||
// Add various records
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'step_start',
|
||||
step: 'plan',
|
||||
agent: 'planner',
|
||||
iteration: 1,
|
||||
timestamp: '2025-01-01T00:00:01.000Z',
|
||||
});
|
||||
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'stream',
|
||||
step: 'plan',
|
||||
event: { type: 'text', data: { text: 'working...' } },
|
||||
timestamp: '2025-01-01T00:00:01.500Z',
|
||||
});
|
||||
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'step_complete',
|
||||
step: 'plan',
|
||||
agent: 'planner',
|
||||
status: 'done',
|
||||
content: 'Done',
|
||||
instruction: 'Plan it',
|
||||
timestamp: '2025-01-01T00:00:02.000Z',
|
||||
} satisfies NdjsonStepComplete);
|
||||
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'workflow_complete',
|
||||
iterations: 1,
|
||||
endTime: '2025-01-01T00:00:03.000Z',
|
||||
});
|
||||
|
||||
const log = loadNdjsonLog(filepath);
|
||||
expect(log).not.toBeNull();
|
||||
// Only step_complete adds to history
|
||||
expect(log!.history).toHaveLength(1);
|
||||
expect(log!.iterations).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('loadSessionLog with .jsonl extension', () => {
|
||||
it('should delegate to loadNdjsonLog for .jsonl files', () => {
|
||||
const filepath = initNdjsonLog('sess-006', 'jsonl task', 'wf', projectDir);
|
||||
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'step_complete',
|
||||
step: 'plan',
|
||||
agent: 'planner',
|
||||
status: 'done',
|
||||
content: 'Plan done',
|
||||
instruction: 'Plan',
|
||||
timestamp: '2025-01-01T00:00:02.000Z',
|
||||
} satisfies NdjsonStepComplete);
|
||||
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'workflow_complete',
|
||||
iterations: 1,
|
||||
endTime: '2025-01-01T00:00:03.000Z',
|
||||
});
|
||||
|
||||
// loadSessionLog should handle .jsonl
|
||||
const log = loadSessionLog(filepath);
|
||||
expect(log).not.toBeNull();
|
||||
expect(log!.task).toBe('jsonl task');
|
||||
expect(log!.status).toBe('completed');
|
||||
});
|
||||
|
||||
it('should still load legacy .json files', () => {
|
||||
const logsDir = join(projectDir, '.takt', 'logs');
|
||||
mkdirSync(logsDir, { recursive: true });
|
||||
const legacyPath = join(logsDir, 'legacy-001.json');
|
||||
const legacyLog: SessionLog = {
|
||||
task: 'legacy task',
|
||||
projectDir,
|
||||
workflowName: 'wf',
|
||||
iterations: 0,
|
||||
startTime: new Date().toISOString(),
|
||||
status: 'running',
|
||||
history: [],
|
||||
};
|
||||
writeFileSync(legacyPath, JSON.stringify(legacyLog, null, 2), 'utf-8');
|
||||
|
||||
const log = loadSessionLog(legacyPath);
|
||||
expect(log).not.toBeNull();
|
||||
expect(log!.task).toBe('legacy task');
|
||||
});
|
||||
});
|
||||
|
||||
describe('appendNdjsonLine real-time characteristics', () => {
|
||||
it('should append without overwriting previous content', () => {
|
||||
const filepath = initNdjsonLog('sess-007', 'task', 'wf', projectDir);
|
||||
|
||||
// Read after init
|
||||
const after1 = readFileSync(filepath, 'utf-8').trim().split('\n');
|
||||
expect(after1).toHaveLength(1);
|
||||
|
||||
// Append more records
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'stream',
|
||||
step: 'plan',
|
||||
event: { type: 'text', data: { text: 'chunk1' } },
|
||||
timestamp: '2025-01-01T00:00:01.000Z',
|
||||
});
|
||||
|
||||
const after2 = readFileSync(filepath, 'utf-8').trim().split('\n');
|
||||
expect(after2).toHaveLength(2);
|
||||
// First line should still be workflow_start
|
||||
expect(JSON.parse(after2[0]!).type).toBe('workflow_start');
|
||||
});
|
||||
|
||||
it('should produce valid JSON on each line', () => {
|
||||
const filepath = initNdjsonLog('sess-008', 'task', 'wf', projectDir);
|
||||
|
||||
for (let i = 0; i < 5; i++) {
|
||||
appendNdjsonLine(filepath, {
|
||||
type: 'stream',
|
||||
step: 'plan',
|
||||
event: { type: 'text', data: { text: `chunk-${i}` } },
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
const content = readFileSync(filepath, 'utf-8');
|
||||
const lines = content.trim().split('\n');
|
||||
expect(lines).toHaveLength(6); // 1 init + 5 stream
|
||||
|
||||
// Every line should be valid JSON
|
||||
for (const line of lines) {
|
||||
expect(() => JSON.parse(line)).not.toThrow();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -24,10 +24,15 @@ import {
|
||||
import {
|
||||
generateSessionId,
|
||||
createSessionLog,
|
||||
addToSessionLog,
|
||||
finalizeSessionLog,
|
||||
saveSessionLog,
|
||||
updateLatestPointer,
|
||||
initNdjsonLog,
|
||||
appendNdjsonLine,
|
||||
type NdjsonStepStart,
|
||||
type NdjsonStepComplete,
|
||||
type NdjsonStream,
|
||||
type NdjsonWorkflowComplete,
|
||||
type NdjsonWorkflowAbort,
|
||||
} from '../utils/session.js';
|
||||
import { createLogger } from '../utils/debug.js';
|
||||
import { notifySuccess, notifyError } from '../utils/notification.js';
|
||||
@ -91,19 +96,34 @@ export async function executeWorkflow(
|
||||
header(`${headerPrefix} ${workflowConfig.name}`);
|
||||
|
||||
const workflowSessionId = generateSessionId();
|
||||
const sessionLog = createSessionLog(task, projectCwd, workflowConfig.name);
|
||||
let sessionLog = createSessionLog(task, projectCwd, workflowConfig.name);
|
||||
|
||||
// Persist initial log + pointer at workflow start (enables crash recovery)
|
||||
saveSessionLog(sessionLog, workflowSessionId, projectCwd);
|
||||
// Initialize NDJSON log file + pointer at workflow start
|
||||
const ndjsonLogPath = initNdjsonLog(workflowSessionId, task, workflowConfig.name, projectCwd);
|
||||
updateLatestPointer(sessionLog, workflowSessionId, projectCwd, { copyToPrevious: true });
|
||||
|
||||
// Track current step name for stream log records
|
||||
const stepRef: { current: string } = { current: '' };
|
||||
|
||||
// Track current display for streaming
|
||||
const displayRef: { current: StreamDisplay | null } = { current: null };
|
||||
|
||||
// Create stream handler that delegates to current display
|
||||
// Create stream handler that delegates to UI display + writes NDJSON log
|
||||
const streamHandler = (
|
||||
event: Parameters<ReturnType<StreamDisplay['createHandler']>>[0]
|
||||
): void => {
|
||||
// Write stream event to NDJSON log (real-time)
|
||||
if (stepRef.current) {
|
||||
const record: NdjsonStream = {
|
||||
type: 'stream',
|
||||
step: stepRef.current,
|
||||
event,
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
appendNdjsonLine(ndjsonLogPath, record);
|
||||
}
|
||||
|
||||
// Delegate to UI display
|
||||
if (!displayRef.current) return;
|
||||
if (event.type === 'result') return;
|
||||
displayRef.current.createHandler()(event);
|
||||
@ -183,6 +203,17 @@ export async function executeWorkflow(
|
||||
log.debug('Step starting', { step: step.name, agent: step.agentDisplayName, iteration });
|
||||
info(`[${iteration}/${workflowConfig.maxIterations}] ${step.name} (${step.agentDisplayName})`);
|
||||
displayRef.current = new StreamDisplay(step.agentDisplayName);
|
||||
stepRef.current = step.name;
|
||||
|
||||
// Write step_start record to NDJSON log
|
||||
const record: NdjsonStepStart = {
|
||||
type: 'step_start',
|
||||
step: step.name,
|
||||
agent: step.agentDisplayName,
|
||||
iteration,
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
appendNdjsonLine(ndjsonLogPath, record);
|
||||
});
|
||||
|
||||
engine.on('step:complete', (step, response, instruction) => {
|
||||
@ -219,10 +250,24 @@ export async function executeWorkflow(
|
||||
if (response.sessionId) {
|
||||
status('Session', response.sessionId);
|
||||
}
|
||||
addToSessionLog(sessionLog, step.name, response, instruction);
|
||||
|
||||
// Incremental save after each step
|
||||
saveSessionLog(sessionLog, workflowSessionId, projectCwd);
|
||||
// Write step_complete record to NDJSON log
|
||||
const record: NdjsonStepComplete = {
|
||||
type: 'step_complete',
|
||||
step: step.name,
|
||||
agent: response.agent,
|
||||
status: response.status,
|
||||
content: response.content,
|
||||
instruction,
|
||||
...(response.matchedRuleIndex != null ? { matchedRuleIndex: response.matchedRuleIndex } : {}),
|
||||
...(response.matchedRuleMethod ? { matchedRuleMethod: response.matchedRuleMethod } : {}),
|
||||
...(response.error ? { error: response.error } : {}),
|
||||
timestamp: response.timestamp.toISOString(),
|
||||
};
|
||||
appendNdjsonLine(ndjsonLogPath, record);
|
||||
|
||||
// Update in-memory log for pointer metadata (immutable)
|
||||
sessionLog = { ...sessionLog, iterations: sessionLog.iterations + 1 };
|
||||
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
||||
});
|
||||
|
||||
@ -234,9 +279,15 @@ export async function executeWorkflow(
|
||||
|
||||
engine.on('workflow:complete', (state) => {
|
||||
log.info('Workflow completed successfully', { iterations: state.iteration });
|
||||
finalizeSessionLog(sessionLog, 'completed');
|
||||
// Save log to project root so user can find it easily
|
||||
const logPath = saveSessionLog(sessionLog, workflowSessionId, projectCwd);
|
||||
sessionLog = finalizeSessionLog(sessionLog, 'completed');
|
||||
|
||||
// Write workflow_complete record to NDJSON log
|
||||
const record: NdjsonWorkflowComplete = {
|
||||
type: 'workflow_complete',
|
||||
iterations: state.iteration,
|
||||
endTime: new Date().toISOString(),
|
||||
};
|
||||
appendNdjsonLine(ndjsonLogPath, record);
|
||||
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
||||
|
||||
const elapsed = sessionLog.endTime
|
||||
@ -245,7 +296,7 @@ export async function executeWorkflow(
|
||||
const elapsedDisplay = elapsed ? `, ${elapsed}` : '';
|
||||
|
||||
success(`Workflow completed (${state.iteration} iterations${elapsedDisplay})`);
|
||||
info(`Session log: ${logPath}`);
|
||||
info(`Session log: ${ndjsonLogPath}`);
|
||||
notifySuccess('TAKT', `ワークフロー完了 (${state.iteration} iterations)`);
|
||||
});
|
||||
|
||||
@ -256,9 +307,16 @@ export async function executeWorkflow(
|
||||
displayRef.current = null;
|
||||
}
|
||||
abortReason = reason;
|
||||
finalizeSessionLog(sessionLog, 'aborted');
|
||||
// Save log to project root so user can find it easily
|
||||
const logPath = saveSessionLog(sessionLog, workflowSessionId, projectCwd);
|
||||
sessionLog = finalizeSessionLog(sessionLog, 'aborted');
|
||||
|
||||
// Write workflow_abort record to NDJSON log
|
||||
const record: NdjsonWorkflowAbort = {
|
||||
type: 'workflow_abort',
|
||||
iterations: state.iteration,
|
||||
reason,
|
||||
endTime: new Date().toISOString(),
|
||||
};
|
||||
appendNdjsonLine(ndjsonLogPath, record);
|
||||
updateLatestPointer(sessionLog, workflowSessionId, projectCwd);
|
||||
|
||||
const elapsed = sessionLog.endTime
|
||||
@ -267,7 +325,7 @@ export async function executeWorkflow(
|
||||
const elapsedDisplay = elapsed ? ` (${elapsed})` : '';
|
||||
|
||||
error(`Workflow aborted after ${state.iteration} iterations${elapsedDisplay}: ${reason}`);
|
||||
info(`Session log: ${logPath}`);
|
||||
info(`Session log: ${ndjsonLogPath}`);
|
||||
notifyError('TAKT', `中断: ${reason}`);
|
||||
});
|
||||
|
||||
|
||||
@ -2,9 +2,9 @@
|
||||
* Session management utilities
|
||||
*/
|
||||
|
||||
import { existsSync, readFileSync, copyFileSync } from 'node:fs';
|
||||
import { existsSync, readFileSync, copyFileSync, appendFileSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import type { AgentResponse, WorkflowState } from '../models/types.js';
|
||||
import type { StreamEvent } from '../claude/types.js';
|
||||
import { getProjectLogsDir, getGlobalLogsDir, ensureDir, writeFileAtomic } from '../config/paths.js';
|
||||
|
||||
/** Session log entry */
|
||||
@ -31,6 +31,176 @@ export interface SessionLog {
|
||||
}>;
|
||||
}
|
||||
|
||||
// --- NDJSON log types ---
|
||||
|
||||
/** NDJSON record: workflow started */
|
||||
export interface NdjsonWorkflowStart {
|
||||
type: 'workflow_start';
|
||||
task: string;
|
||||
workflowName: string;
|
||||
startTime: string;
|
||||
}
|
||||
|
||||
/** NDJSON record: step started */
|
||||
export interface NdjsonStepStart {
|
||||
type: 'step_start';
|
||||
step: string;
|
||||
agent: string;
|
||||
iteration: number;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
/** NDJSON record: streaming chunk received */
|
||||
export interface NdjsonStream {
|
||||
type: 'stream';
|
||||
step: string;
|
||||
event: StreamEvent;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
/** NDJSON record: step completed */
|
||||
export interface NdjsonStepComplete {
|
||||
type: 'step_complete';
|
||||
step: string;
|
||||
agent: string;
|
||||
status: string;
|
||||
content: string;
|
||||
instruction: string;
|
||||
matchedRuleIndex?: number;
|
||||
matchedRuleMethod?: string;
|
||||
error?: string;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
/** NDJSON record: workflow completed successfully */
|
||||
export interface NdjsonWorkflowComplete {
|
||||
type: 'workflow_complete';
|
||||
iterations: number;
|
||||
endTime: string;
|
||||
}
|
||||
|
||||
/** NDJSON record: workflow aborted */
|
||||
export interface NdjsonWorkflowAbort {
|
||||
type: 'workflow_abort';
|
||||
iterations: number;
|
||||
reason: string;
|
||||
endTime: string;
|
||||
}
|
||||
|
||||
/** Union of all NDJSON record types */
|
||||
export type NdjsonRecord =
|
||||
| NdjsonWorkflowStart
|
||||
| NdjsonStepStart
|
||||
| NdjsonStream
|
||||
| NdjsonStepComplete
|
||||
| NdjsonWorkflowComplete
|
||||
| NdjsonWorkflowAbort;
|
||||
|
||||
/**
|
||||
* Append a single NDJSON line to a log file.
|
||||
* Uses appendFileSync for atomic open→write→close (no file lock held).
|
||||
*/
|
||||
export function appendNdjsonLine(filepath: string, record: NdjsonRecord): void {
|
||||
appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8');
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize an NDJSON log file with the workflow_start record.
|
||||
* Creates the logs directory if needed and returns the file path.
|
||||
*/
|
||||
export function initNdjsonLog(
|
||||
sessionId: string,
|
||||
task: string,
|
||||
workflowName: string,
|
||||
projectDir?: string,
|
||||
): string {
|
||||
const logsDir = projectDir
|
||||
? getProjectLogsDir(projectDir)
|
||||
: getGlobalLogsDir();
|
||||
ensureDir(logsDir);
|
||||
|
||||
const filepath = join(logsDir, `${sessionId}.jsonl`);
|
||||
const record: NdjsonWorkflowStart = {
|
||||
type: 'workflow_start',
|
||||
task,
|
||||
workflowName,
|
||||
startTime: new Date().toISOString(),
|
||||
};
|
||||
appendNdjsonLine(filepath, record);
|
||||
return filepath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load an NDJSON log file and convert it to a SessionLog for backward compatibility.
|
||||
* Parses each line as a JSON record and reconstructs the SessionLog structure.
|
||||
*/
|
||||
export function loadNdjsonLog(filepath: string): SessionLog | null {
|
||||
if (!existsSync(filepath)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const content = readFileSync(filepath, 'utf-8');
|
||||
const lines = content.trim().split('\n').filter((line) => line.length > 0);
|
||||
if (lines.length === 0) return null;
|
||||
|
||||
let sessionLog: SessionLog | null = null;
|
||||
|
||||
for (const line of lines) {
|
||||
const record = JSON.parse(line) as NdjsonRecord;
|
||||
|
||||
switch (record.type) {
|
||||
case 'workflow_start':
|
||||
sessionLog = {
|
||||
task: record.task,
|
||||
projectDir: '',
|
||||
workflowName: record.workflowName,
|
||||
iterations: 0,
|
||||
startTime: record.startTime,
|
||||
status: 'running',
|
||||
history: [],
|
||||
};
|
||||
break;
|
||||
|
||||
case 'step_complete':
|
||||
if (sessionLog) {
|
||||
sessionLog.history.push({
|
||||
step: record.step,
|
||||
agent: record.agent,
|
||||
instruction: record.instruction,
|
||||
status: record.status,
|
||||
timestamp: record.timestamp,
|
||||
content: record.content,
|
||||
...(record.error ? { error: record.error } : {}),
|
||||
...(record.matchedRuleIndex != null ? { matchedRuleIndex: record.matchedRuleIndex } : {}),
|
||||
...(record.matchedRuleMethod ? { matchedRuleMethod: record.matchedRuleMethod } : {}),
|
||||
});
|
||||
sessionLog.iterations++;
|
||||
}
|
||||
break;
|
||||
|
||||
case 'workflow_complete':
|
||||
if (sessionLog) {
|
||||
sessionLog.status = 'completed';
|
||||
sessionLog.endTime = record.endTime;
|
||||
}
|
||||
break;
|
||||
|
||||
case 'workflow_abort':
|
||||
if (sessionLog) {
|
||||
sessionLog.status = 'aborted';
|
||||
sessionLog.endTime = record.endTime;
|
||||
}
|
||||
break;
|
||||
|
||||
// stream and step_start records are not stored in SessionLog
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return sessionLog;
|
||||
}
|
||||
|
||||
/** Generate a session ID */
|
||||
export function generateSessionId(): string {
|
||||
const timestamp = Date.now().toString(36);
|
||||
@ -77,56 +247,25 @@ export function createSessionLog(
|
||||
};
|
||||
}
|
||||
|
||||
/** Add agent response to session log */
|
||||
export function addToSessionLog(
|
||||
log: SessionLog,
|
||||
stepName: string,
|
||||
response: AgentResponse,
|
||||
instruction: string
|
||||
): void {
|
||||
log.history.push({
|
||||
step: stepName,
|
||||
agent: response.agent,
|
||||
instruction,
|
||||
status: response.status,
|
||||
timestamp: response.timestamp.toISOString(),
|
||||
content: response.content,
|
||||
...(response.error ? { error: response.error } : {}),
|
||||
...(response.matchedRuleIndex != null ? { matchedRuleIndex: response.matchedRuleIndex } : {}),
|
||||
...(response.matchedRuleMethod ? { matchedRuleMethod: response.matchedRuleMethod } : {}),
|
||||
});
|
||||
log.iterations++;
|
||||
}
|
||||
|
||||
/** Finalize session log */
|
||||
/** Create a finalized copy of a session log (immutable — does not modify the original) */
|
||||
export function finalizeSessionLog(
|
||||
log: SessionLog,
|
||||
status: 'completed' | 'aborted'
|
||||
): void {
|
||||
log.status = status;
|
||||
log.endTime = new Date().toISOString();
|
||||
): SessionLog {
|
||||
return {
|
||||
...log,
|
||||
status,
|
||||
endTime: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
/** Save session log to file */
|
||||
export function saveSessionLog(
|
||||
log: SessionLog,
|
||||
sessionId: string,
|
||||
projectDir?: string
|
||||
): string {
|
||||
const logsDir = projectDir
|
||||
? getProjectLogsDir(projectDir)
|
||||
: getGlobalLogsDir();
|
||||
ensureDir(logsDir);
|
||||
|
||||
const filename = `${sessionId}.json`;
|
||||
const filepath = join(logsDir, filename);
|
||||
|
||||
writeFileAtomic(filepath, JSON.stringify(log, null, 2));
|
||||
return filepath;
|
||||
}
|
||||
|
||||
/** Load session log from file */
|
||||
/** Load session log from file (supports both .json and .jsonl formats) */
|
||||
export function loadSessionLog(filepath: string): SessionLog | null {
|
||||
// Try NDJSON format for .jsonl files
|
||||
if (filepath.endsWith('.jsonl')) {
|
||||
return loadNdjsonLog(filepath);
|
||||
}
|
||||
|
||||
if (!existsSync(filepath)) {
|
||||
return null;
|
||||
}
|
||||
@ -191,7 +330,7 @@ export function updateLatestPointer(
|
||||
|
||||
const pointer: LatestLogPointer = {
|
||||
sessionId,
|
||||
logFile: `${sessionId}.json`,
|
||||
logFile: `${sessionId}.jsonl`,
|
||||
task: log.task,
|
||||
workflowName: log.workflowName,
|
||||
status: log.status,
|
||||
@ -203,32 +342,3 @@ export function updateLatestPointer(
|
||||
writeFileAtomic(latestPath, JSON.stringify(pointer, null, 2));
|
||||
}
|
||||
|
||||
/** Convert workflow state to session log */
|
||||
export function workflowStateToSessionLog(
|
||||
state: WorkflowState,
|
||||
task: string,
|
||||
projectDir: string
|
||||
): SessionLog {
|
||||
const log: SessionLog = {
|
||||
task,
|
||||
projectDir,
|
||||
workflowName: state.workflowName,
|
||||
iterations: state.iteration,
|
||||
startTime: new Date().toISOString(),
|
||||
status: state.status === 'running' ? 'running' : state.status === 'completed' ? 'completed' : 'aborted',
|
||||
history: [],
|
||||
};
|
||||
|
||||
for (const [stepName, response] of state.stepOutputs) {
|
||||
log.history.push({
|
||||
step: stepName,
|
||||
agent: response.agent,
|
||||
instruction: '',
|
||||
status: response.status,
|
||||
timestamp: response.timestamp.toISOString(),
|
||||
content: response.content,
|
||||
});
|
||||
}
|
||||
|
||||
return log;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user