diff --git a/src/__tests__/globalConfig-defaults.test.ts b/src/__tests__/globalConfig-defaults.test.ts index 4753732..d8a5ccc 100644 --- a/src/__tests__/globalConfig-defaults.test.ts +++ b/src/__tests__/globalConfig-defaults.test.ts @@ -314,6 +314,43 @@ describe('loadGlobalConfig', () => { }); }); + it('should load observability.provider_events config from config.yaml', () => { + const taktDir = join(testHomeDir, '.takt'); + mkdirSync(taktDir, { recursive: true }); + writeFileSync( + getGlobalConfigPath(), + [ + 'language: en', + 'observability:', + ' provider_events: false', + ].join('\n'), + 'utf-8', + ); + + const config = loadGlobalConfig(); + expect(config.observability).toEqual({ + providerEvents: false, + }); + }); + + it('should save and reload observability.provider_events config', () => { + const taktDir = join(testHomeDir, '.takt'); + mkdirSync(taktDir, { recursive: true }); + writeFileSync(getGlobalConfigPath(), 'language: en\n', 'utf-8'); + + const config = loadGlobalConfig(); + config.observability = { + providerEvents: false, + }; + saveGlobalConfig(config); + invalidateGlobalConfigCache(); + + const reloaded = loadGlobalConfig(); + expect(reloaded.observability).toEqual({ + providerEvents: false, + }); + }); + it('should save and reload notification_sound_events config', () => { const taktDir = join(testHomeDir, '.takt'); mkdirSync(taktDir, { recursive: true }); diff --git a/src/__tests__/models.test.ts b/src/__tests__/models.test.ts index 20139f3..0709d02 100644 --- a/src/__tests__/models.test.ts +++ b/src/__tests__/models.test.ts @@ -410,15 +410,20 @@ describe('GlobalConfigSchema', () => { expect(result.default_piece).toBe('default'); expect(result.log_level).toBe('info'); expect(result.provider).toBe('claude'); + expect(result.observability).toBeUndefined(); }); it('should accept valid config', () => { const config = { default_piece: 'custom', log_level: 'debug' as const, + observability: { + provider_events: false, + }, }; const result = GlobalConfigSchema.parse(config); expect(result.log_level).toBe('debug'); + expect(result.observability?.provider_events).toBe(false); }); }); diff --git a/src/__tests__/providerEventLogger.test.ts b/src/__tests__/providerEventLogger.test.ts new file mode 100644 index 0000000..08d361f --- /dev/null +++ b/src/__tests__/providerEventLogger.test.ts @@ -0,0 +1,188 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { existsSync, mkdirSync, readFileSync, rmSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { + createProviderEventLogger, + isProviderEventsEnabled, +} from '../shared/utils/providerEventLogger.js'; +import type { ProviderType } from '../core/piece/index.js'; + +describe('providerEventLogger', () => { + let tempDir: string; + + beforeEach(() => { + tempDir = join(tmpdir(), `takt-provider-events-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`); + mkdirSync(tempDir, { recursive: true }); + }); + + afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); + }); + + it('should enable provider events by default', () => { + expect(isProviderEventsEnabled()).toBe(true); + expect(isProviderEventsEnabled({})).toBe(true); + expect(isProviderEventsEnabled({ observability: {} })).toBe(true); + expect(isProviderEventsEnabled({ observability: { providerEvents: true } })).toBe(true); + }); + + it('should disable provider events only when explicitly false', () => { + expect(isProviderEventsEnabled({ observability: { providerEvents: false } })).toBe(false); + }); + + it('should write normalized JSONL records when enabled', () => { + const logger = createProviderEventLogger({ + logsDir: tempDir, + sessionId: 'session-1', + runId: 'run-1', + provider: 'opencode', + movement: 'implement', + enabled: true, + }); + + const original = vi.fn(); + const wrapped = logger.wrapCallback(original); + + wrapped({ + type: 'tool_use', + data: { + tool: 'Read', + id: 'call-123', + messageId: 'msg-123', + requestId: 'req-123', + sessionID: 'session-abc', + }, + }); + + expect(original).toHaveBeenCalledTimes(1); + expect(existsSync(logger.filepath)).toBe(true); + + const lines = readFileSync(logger.filepath, 'utf-8').trim().split('\n'); + expect(lines).toHaveLength(1); + + const parsed = JSON.parse(lines[0]!) as { + provider: ProviderType; + event_type: string; + run_id: string; + movement: string; + session_id?: string; + call_id?: string; + message_id?: string; + request_id?: string; + data: Record; + }; + + expect(parsed.provider).toBe('opencode'); + expect(parsed.event_type).toBe('tool_use'); + expect(parsed.run_id).toBe('run-1'); + expect(parsed.movement).toBe('implement'); + expect(parsed.session_id).toBe('session-abc'); + expect(parsed.call_id).toBe('call-123'); + expect(parsed.message_id).toBe('msg-123'); + expect(parsed.request_id).toBe('req-123'); + expect(parsed.data['tool']).toBe('Read'); + }); + + it('should update movement and provider for subsequent events', () => { + const logger = createProviderEventLogger({ + logsDir: tempDir, + sessionId: 'session-2', + runId: 'run-2', + provider: 'claude', + movement: 'plan', + enabled: true, + }); + + const wrapped = logger.wrapCallback(); + + wrapped({ type: 'init', data: { model: 'sonnet', sessionId: 's-1' } }); + logger.setMovement('implement'); + logger.setProvider('codex'); + wrapped({ type: 'result', data: { result: 'ok', sessionId: 's-1', success: true } }); + + const lines = readFileSync(logger.filepath, 'utf-8').trim().split('\n'); + expect(lines).toHaveLength(2); + + const first = JSON.parse(lines[0]!) as { provider: ProviderType; movement: string }; + const second = JSON.parse(lines[1]!) as { provider: ProviderType; movement: string }; + + expect(first.provider).toBe('claude'); + expect(first.movement).toBe('plan'); + expect(second.provider).toBe('codex'); + expect(second.movement).toBe('implement'); + }); + + it('should not write records when disabled', () => { + const logger = createProviderEventLogger({ + logsDir: tempDir, + sessionId: 'session-3', + runId: 'run-3', + provider: 'claude', + movement: 'plan', + enabled: false, + }); + + const original = vi.fn(); + const wrapped = logger.wrapCallback(original); + wrapped({ type: 'text', data: { text: 'hello' } }); + + expect(original).toHaveBeenCalledTimes(1); + expect(existsSync(logger.filepath)).toBe(false); + }); + + it('should truncate long text fields', () => { + const logger = createProviderEventLogger({ + logsDir: tempDir, + sessionId: 'session-4', + runId: 'run-4', + provider: 'claude', + movement: 'plan', + enabled: true, + }); + + const wrapped = logger.wrapCallback(); + const longText = 'a'.repeat(11_000); + wrapped({ type: 'text', data: { text: longText } }); + + const line = readFileSync(logger.filepath, 'utf-8').trim(); + const parsed = JSON.parse(line) as { data: { text: string } }; + + expect(parsed.data.text.length).toBeLessThan(longText.length); + expect(parsed.data.text).toContain('...[truncated]'); + }); + + it('should write init event records with typed data objects', () => { + const logger = createProviderEventLogger({ + logsDir: tempDir, + sessionId: 'session-5', + runId: 'run-5', + provider: 'codex', + movement: 'implement', + enabled: true, + }); + + const wrapped = logger.wrapCallback(); + wrapped({ + type: 'init', + data: { + model: 'gpt-5-codex', + sessionId: 'thread-1', + }, + }); + + const line = readFileSync(logger.filepath, 'utf-8').trim(); + const parsed = JSON.parse(line) as { + provider: ProviderType; + event_type: string; + session_id?: string; + data: { model: string; sessionId: string }; + }; + + expect(parsed.provider).toBe('codex'); + expect(parsed.event_type).toBe('init'); + expect(parsed.session_id).toBe('thread-1'); + expect(parsed.data.model).toBe('gpt-5-codex'); + expect(parsed.data.sessionId).toBe('thread-1'); + }); +}); diff --git a/src/core/models/global-config.ts b/src/core/models/global-config.ts index 4974cd5..89223a2 100644 --- a/src/core/models/global-config.ts +++ b/src/core/models/global-config.ts @@ -20,6 +20,12 @@ export interface DebugConfig { logFile?: string; } +/** Observability configuration for runtime event logs */ +export interface ObservabilityConfig { + /** Enable provider stream event logging (default: true when undefined) */ + providerEvents?: boolean; +} + /** Language setting for takt */ export type Language = 'en' | 'ja'; @@ -55,6 +61,7 @@ export interface GlobalConfig { provider?: 'claude' | 'codex' | 'opencode' | 'mock'; model?: string; debug?: DebugConfig; + observability?: ObservabilityConfig; /** Directory for shared clones (worktree_dir in config). If empty, uses ../{clone-name} relative to project */ worktreeDir?: string; /** Auto-create PR after worktree execution (default: prompt in interactive mode) */ diff --git a/src/core/models/index.ts b/src/core/models/index.ts index bf9b5ef..8221700 100644 --- a/src/core/models/index.ts +++ b/src/core/models/index.ts @@ -22,6 +22,7 @@ export type { PieceState, CustomAgentConfig, DebugConfig, + ObservabilityConfig, Language, PipelineConfig, GlobalConfig, diff --git a/src/core/models/schemas.ts b/src/core/models/schemas.ts index 64a60d7..6ab743d 100644 --- a/src/core/models/schemas.ts +++ b/src/core/models/schemas.ts @@ -309,6 +309,10 @@ export const DebugConfigSchema = z.object({ log_file: z.string().optional(), }); +export const ObservabilityConfigSchema = z.object({ + provider_events: z.boolean().optional(), +}); + /** Language setting schema */ export const LanguageSchema = z.enum(['en', 'ja']); @@ -341,6 +345,7 @@ export const GlobalConfigSchema = z.object({ provider: z.enum(['claude', 'codex', 'opencode', 'mock']).optional().default('claude'), model: z.string().optional(), debug: DebugConfigSchema.optional(), + observability: ObservabilityConfigSchema.optional(), /** Directory for shared clones (worktree_dir in config). If empty, uses ../{clone-name} relative to project */ worktree_dir: z.string().optional(), /** Auto-create PR after worktree execution (default: prompt in interactive mode) */ diff --git a/src/core/models/types.ts b/src/core/models/types.ts index a578752..42e49e9 100644 --- a/src/core/models/types.ts +++ b/src/core/models/types.ts @@ -45,6 +45,7 @@ export type { export type { CustomAgentConfig, DebugConfig, + ObservabilityConfig, Language, PipelineConfig, GlobalConfig, diff --git a/src/features/tasks/execute/pieceExecution.ts b/src/features/tasks/execute/pieceExecution.ts index 4b58272..01bd805 100644 --- a/src/features/tasks/execute/pieceExecution.ts +++ b/src/features/tasks/execute/pieceExecution.ts @@ -59,6 +59,10 @@ import { isValidReportDirName, } from '../../../shared/utils/index.js'; import type { PromptLogRecord } from '../../../shared/utils/index.js'; +import { + createProviderEventLogger, + isProviderEventsEnabled, +} from '../../../shared/utils/providerEventLogger.js'; import { selectOption, promptInput } from '../../../shared/prompt/index.js'; import { getLabel } from '../../../shared/i18n/index.js'; import { installSigIntHandler } from './sigintHandler.js'; @@ -303,6 +307,14 @@ export async function executePiece( const shouldNotifyPieceComplete = shouldNotify && notificationSoundEvents?.pieceComplete !== false; const shouldNotifyPieceAbort = shouldNotify && notificationSoundEvents?.pieceAbort !== false; const currentProvider = globalConfig.provider ?? 'claude'; + const providerEventLogger = createProviderEventLogger({ + logsDir: runPaths.logsAbs, + sessionId: pieceSessionId, + runId: runSlug, + provider: currentProvider, + movement: options.startMovement ?? pieceConfig.initialMovement, + enabled: isProviderEventsEnabled(globalConfig), + }); // Prevent macOS idle sleep if configured if (globalConfig.preventSleep) { @@ -402,7 +414,7 @@ export async function executePiece( try { engine = new PieceEngine(pieceConfig, cwd, task, { abortSignal: runAbortController.signal, - onStream: streamHandler, + onStream: providerEventLogger.wrapCallback(streamHandler), onUserInput, initialSessions: savedSessions, onSessionUpdate: sessionUpdateHandler, @@ -492,6 +504,8 @@ export async function executePiece( }); const movementProvider = resolved.provider ?? currentProvider; const movementModel = resolved.model ?? globalConfig.model ?? '(default)'; + providerEventLogger.setMovement(step.name); + providerEventLogger.setProvider(movementProvider); out.info(`Provider: ${movementProvider}`); out.info(`Model: ${movementModel}`); diff --git a/src/infra/config/global/globalConfig.ts b/src/infra/config/global/globalConfig.ts index e7a7ea6..763f138 100644 --- a/src/infra/config/global/globalConfig.ts +++ b/src/infra/config/global/globalConfig.ts @@ -105,6 +105,9 @@ export class GlobalConfigManager { enabled: parsed.debug.enabled, logFile: parsed.debug.log_file, } : undefined, + observability: parsed.observability ? { + providerEvents: parsed.observability.provider_events, + } : undefined, worktreeDir: parsed.worktree_dir, autoPr: parsed.auto_pr, disabledBuiltins: parsed.disabled_builtins, @@ -158,6 +161,11 @@ export class GlobalConfigManager { log_file: config.debug.logFile, }; } + if (config.observability && config.observability.providerEvents !== undefined) { + raw.observability = { + provider_events: config.observability.providerEvents, + }; + } if (config.worktreeDir) { raw.worktree_dir = config.worktreeDir; } diff --git a/src/shared/utils/index.ts b/src/shared/utils/index.ts index 24859d5..b23b774 100644 --- a/src/shared/utils/index.ts +++ b/src/shared/utils/index.ts @@ -5,6 +5,7 @@ export * from './debug.js'; export * from './error.js'; export * from './notification.js'; +export * from './providerEventLogger.js'; export * from './reportDir.js'; export * from './sleep.js'; export * from './slug.js'; diff --git a/src/shared/utils/providerEventLogger.ts b/src/shared/utils/providerEventLogger.ts new file mode 100644 index 0000000..70b3630 --- /dev/null +++ b/src/shared/utils/providerEventLogger.ts @@ -0,0 +1,137 @@ +import { appendFileSync } from 'node:fs'; +import { join } from 'node:path'; +import type { ProviderType, StreamCallback, StreamEvent } from '../../core/piece/index.js'; + +export interface ProviderEventLoggerConfig { + logsDir: string; + sessionId: string; + runId: string; + provider: ProviderType; + movement: string; + enabled: boolean; +} + +export interface ProviderEventLogger { + readonly filepath: string; + setMovement(movement: string): void; + setProvider(provider: ProviderType): void; + wrapCallback(original?: StreamCallback): StreamCallback; +} + +interface ProviderEventLogRecord { + timestamp: string; + provider: ProviderType; + event_type: string; + run_id: string; + movement: string; + session_id?: string; + message_id?: string; + call_id?: string; + request_id?: string; + data: Record; +} + +const MAX_TEXT_LENGTH = 10_000; +const HEAD_LENGTH = 5_000; +const TAIL_LENGTH = 2_000; +const TRUNCATED_MARKER = '...[truncated]'; + +function truncateString(value: string): string { + if (value.length <= MAX_TEXT_LENGTH) { + return value; + } + return value.slice(0, HEAD_LENGTH) + TRUNCATED_MARKER + value.slice(-TAIL_LENGTH); +} + +function sanitizeData(data: Record): Record { + return Object.fromEntries( + Object.entries(data).map(([key, value]) => { + if (typeof value === 'string') { + return [key, truncateString(value)]; + } + return [key, value]; + }) + ); +} + +function pickString(source: Record, keys: string[]): string | undefined { + for (const key of keys) { + const value = source[key]; + if (typeof value === 'string' && value.length > 0) { + return value; + } + } + return undefined; +} + +function buildLogRecord( + event: StreamEvent, + provider: ProviderType, + movement: string, + runId: string, +): ProviderEventLogRecord { + const data = sanitizeData(event.data as unknown as Record); + const sessionId = pickString(data, ['session_id', 'sessionId', 'sessionID', 'thread_id', 'threadId']); + const messageId = pickString(data, ['message_id', 'messageId', 'item_id', 'itemId']); + const callId = pickString(data, ['call_id', 'callId', 'id']); + const requestId = pickString(data, ['request_id', 'requestId']); + + return { + timestamp: new Date().toISOString(), + provider, + event_type: event.type, + run_id: runId, + movement, + ...(sessionId ? { session_id: sessionId } : {}), + ...(messageId ? { message_id: messageId } : {}), + ...(callId ? { call_id: callId } : {}), + ...(requestId ? { request_id: requestId } : {}), + data, + }; +} + +export function createProviderEventLogger(config: ProviderEventLoggerConfig): ProviderEventLogger { + const filepath = join(config.logsDir, `${config.sessionId}-provider-events.jsonl`); + let movement = config.movement; + let provider = config.provider; + + const write = (event: StreamEvent): void => { + try { + const record = buildLogRecord(event, provider, movement, config.runId); + appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8'); + } catch { + // Silently fail - observability logging should not interrupt main flow. + } + }; + + return { + filepath, + setMovement(nextMovement: string): void { + movement = nextMovement; + }, + setProvider(nextProvider: ProviderType): void { + provider = nextProvider; + }, + wrapCallback(original?: StreamCallback): StreamCallback { + if (!config.enabled && original) { + return original; + } + if (!config.enabled) { + return () => {}; + } + + return (event: StreamEvent): void => { + write(event); + original?.(event); + }; + }, + }; +} + +export function isProviderEventsEnabled(config?: { + observability?: { + providerEvents?: boolean; + }; +}): boolean { + return config?.observability?.providerEvents !== false; +}