From 76cfd771f8721e5e32ed675364bfec751d43b5fc Mon Sep 17 00:00:00 2001 From: nrs <38722970+nrslib@users.noreply.github.com> Date: Thu, 5 Mar 2026 11:27:59 +0900 Subject: [PATCH] takt: implement-usage-event-logging (#470) --- .../claude-executor-structured-output.test.ts | 45 +++ src/__tests__/codex-structured-output.test.ts | 38 +++ src/__tests__/globalConfig-defaults.test.ts | 24 ++ src/__tests__/logging-contracts.test.ts | 118 ++++++++ src/__tests__/models.test.ts | 5 + .../pieceExecution-session-loading.test.ts | 136 ++++++++- src/__tests__/providerEventLogger.test.ts | 14 + src/__tests__/runSessionReader.test.ts | 20 ++ src/__tests__/usageEventLogger.test.ts | 273 ++++++++++++++++++ src/core/logging/contracts.ts | 11 + src/core/logging/providerEvent.ts | 176 +++++++++++ src/core/logging/providerEventLogger.ts | 87 ++++++ src/core/logging/usageEventLogger.ts | 108 +++++++ src/core/models/index.ts | 1 + src/core/models/persisted-global-config.ts | 2 + src/core/models/response.ts | 13 + src/core/models/schemas.ts | 1 + src/core/models/types.ts | 1 + src/features/interactive/runSessionReader.ts | 10 +- src/features/tasks/execute/pieceExecution.ts | 36 +-- .../tasks/execute/pieceExecutionUtils.ts | 7 + src/infra/claude/client.ts | 3 + src/infra/claude/executor.ts | 60 +++- src/infra/claude/types.ts | 3 + src/infra/codex/client.ts | 59 +++- src/infra/config/global/globalConfigCore.ts | 1 + .../config/global/globalConfigSerializer.ts | 2 + src/shared/utils/providerEventLogger.ts | 151 +--------- src/shared/utils/usageEventLogger.ts | 8 + 29 files changed, 1246 insertions(+), 167 deletions(-) create mode 100644 src/__tests__/logging-contracts.test.ts create mode 100644 src/__tests__/usageEventLogger.test.ts create mode 100644 src/core/logging/contracts.ts create mode 100644 src/core/logging/providerEvent.ts create mode 100644 src/core/logging/providerEventLogger.ts create mode 100644 src/core/logging/usageEventLogger.ts create mode 100644 src/shared/utils/usageEventLogger.ts diff --git a/src/__tests__/claude-executor-structured-output.test.ts b/src/__tests__/claude-executor-structured-output.test.ts index 4bbe16e..0200cd8 100644 --- a/src/__tests__/claude-executor-structured-output.test.ts +++ b/src/__tests__/claude-executor-structured-output.test.ts @@ -161,4 +161,49 @@ describe('QueryExecutor — structuredOutput 抽出', () => { expect(result.content).toBe('final text'); expect(result.structuredOutput).toEqual({ step: 1, reason: 'approved' }); }); + + it('result メッセージの usage を providerUsage として抽出する', async () => { + mockQuery.mockReturnValue(createMockQuery([ + { + type: 'result', + subtype: 'success', + result: 'done', + usage: { + input_tokens: 12, + output_tokens: 34, + cache_creation_input_tokens: 5, + cache_read_input_tokens: 7, + }, + }, + ])); + + const executor = new QueryExecutor(); + const result = await executor.execute('test', { cwd: '/tmp' }); + const providerUsage = result.providerUsage; + + expect(providerUsage).toEqual({ + inputTokens: 12, + outputTokens: 34, + totalTokens: 46, + cachedInputTokens: 12, + cacheCreationInputTokens: 5, + cacheReadInputTokens: 7, + usageMissing: false, + }); + }); + + it('usage が存在しない場合は usageMissing=true と reason を返す', async () => { + mockQuery.mockReturnValue(createMockQuery([ + { type: 'result', subtype: 'success', result: 'done' }, + ])); + + const executor = new QueryExecutor(); + const result = await executor.execute('test', { cwd: '/tmp' }); + const providerUsage = result.providerUsage; + + expect(providerUsage).toMatchObject({ + usageMissing: true, + reason: 'usage_not_available', + }); + }); }); diff --git a/src/__tests__/codex-structured-output.test.ts b/src/__tests__/codex-structured-output.test.ts index 838f570..322e3ed 100644 --- a/src/__tests__/codex-structured-output.test.ts +++ b/src/__tests__/codex-structured-output.test.ts @@ -191,4 +191,42 @@ describe('CodexClient — structuredOutput 抽出', () => { codexPathOverride: '/opt/codex/bin/codex', }); }); + + it('turn.completed の usage を providerUsage として返す', async () => { + mockEvents = [ + { type: 'thread.started', thread_id: 'thread-1' }, + { + type: 'turn.completed', + usage: { input_tokens: 11, output_tokens: 22, cached_input_tokens: 3 }, + }, + ]; + + const client = new CodexClient(); + const result = await client.call('coder', 'prompt', { cwd: '/tmp' }); + const providerUsage = result.providerUsage; + + expect(providerUsage).toEqual({ + inputTokens: 11, + outputTokens: 22, + totalTokens: 33, + cachedInputTokens: 3, + usageMissing: false, + }); + }); + + it('turn.completed に usage がない場合は usageMissing=true と reason を返す', async () => { + mockEvents = [ + { type: 'thread.started', thread_id: 'thread-1' }, + { type: 'turn.completed' }, + ]; + + const client = new CodexClient(); + const result = await client.call('coder', 'prompt', { cwd: '/tmp' }); + const providerUsage = result.providerUsage; + + expect(providerUsage).toMatchObject({ + usageMissing: true, + reason: 'usage_not_available', + }); + }); }); diff --git a/src/__tests__/globalConfig-defaults.test.ts b/src/__tests__/globalConfig-defaults.test.ts index de544f0..46e18f3 100644 --- a/src/__tests__/globalConfig-defaults.test.ts +++ b/src/__tests__/globalConfig-defaults.test.ts @@ -503,6 +503,7 @@ describe('loadGlobalConfig', () => { 'language: en', 'logging:', ' provider_events: false', + ' usage_events: true', ].join('\n'), 'utf-8', ); @@ -510,6 +511,7 @@ describe('loadGlobalConfig', () => { const config = loadGlobalConfig(); expect(config.logging).toEqual({ providerEvents: false, + usageEvents: true, }); }); @@ -525,6 +527,7 @@ describe('loadGlobalConfig', () => { ' trace: true', ' debug: true', ' provider_events: true', + ' usage_events: false', ].join('\n'), 'utf-8', ); @@ -535,6 +538,7 @@ describe('loadGlobalConfig', () => { trace: true, debug: true, providerEvents: true, + usageEvents: false, }); }); @@ -549,6 +553,7 @@ describe('loadGlobalConfig', () => { trace: false, debug: true, providerEvents: false, + usageEvents: true, }; saveGlobalConfig(config); invalidateGlobalConfigCache(); @@ -559,6 +564,7 @@ describe('loadGlobalConfig', () => { trace: false, debug: true, providerEvents: false, + usageEvents: true, }); }); @@ -580,6 +586,24 @@ describe('loadGlobalConfig', () => { }); }); + it('should save partial logging config (only usage_events)', () => { + const taktDir = join(testHomeDir, '.takt'); + mkdirSync(taktDir, { recursive: true }); + writeFileSync(getGlobalConfigPath(), 'language: en\n', 'utf-8'); + + const config = loadGlobalConfig(); + config.logging = { + usageEvents: true, + }; + saveGlobalConfig(config); + invalidateGlobalConfigCache(); + + const reloaded = loadGlobalConfig(); + expect(reloaded.logging).toEqual({ + usageEvents: true, + }); + }); + it('should save and reload notification_sound_events config', () => { const taktDir = join(testHomeDir, '.takt'); diff --git a/src/__tests__/logging-contracts.test.ts b/src/__tests__/logging-contracts.test.ts new file mode 100644 index 0000000..f88d3bb --- /dev/null +++ b/src/__tests__/logging-contracts.test.ts @@ -0,0 +1,118 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { mkdirSync, rmSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { + PROVIDER_EVENTS_LOG_FILE_SUFFIX, + USAGE_EVENTS_LOG_FILE_SUFFIX, + USAGE_MISSING_REASONS, +} from '../core/logging/contracts.js'; +import { buildUsageEventRecord } from '../core/logging/providerEvent.js'; +import { createProviderEventLogger } from '../core/logging/providerEventLogger.js'; +import { createUsageEventLogger } from '../core/logging/usageEventLogger.js'; +import type { ProviderUsageSnapshot } from '../core/models/response.js'; + +describe('logging contracts', () => { + let tempDir: string; + + beforeEach(() => { + tempDir = join(tmpdir(), `takt-logging-contracts-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`); + mkdirSync(tempDir, { recursive: true }); + }); + + afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); + }); + + it('should use shared file suffix contracts for provider and usage loggers', () => { + const providerLogger = createProviderEventLogger({ + logsDir: tempDir, + sessionId: 'session-a', + runId: 'run-a', + provider: 'claude', + movement: 'plan', + enabled: false, + }); + const usageLogger = createUsageEventLogger({ + logsDir: tempDir, + sessionId: 'session-b', + runId: 'run-b', + provider: 'codex', + providerModel: 'gpt-5-codex', + movement: 'implement', + movementType: 'normal', + enabled: false, + }); + + expect(providerLogger.filepath.endsWith(PROVIDER_EVENTS_LOG_FILE_SUFFIX)).toBe(true); + expect(usageLogger.filepath.endsWith(USAGE_EVENTS_LOG_FILE_SUFFIX)).toBe(true); + }); + + it('should accept shared ProviderUsageSnapshot contract in usage record builder', () => { + const usage: ProviderUsageSnapshot = { + inputTokens: 12, + outputTokens: 8, + totalTokens: 20, + cachedInputTokens: 4, + cacheCreationInputTokens: 2, + cacheReadInputTokens: 2, + usageMissing: false, + }; + + const record = buildUsageEventRecord( + { + runId: 'run-1', + sessionId: 'session-1', + provider: 'claude', + providerModel: 'sonnet', + movement: 'implement', + movementType: 'normal', + }, + { success: true, usage, timestamp: new Date('2026-03-04T12:00:00.000Z') }, + ); + + expect(record.usage.cached_input_tokens).toBe(4); + }); + + it('should reject usage_missing records with unknown reason values', () => { + expect(() => + buildUsageEventRecord( + { + runId: 'run-2', + sessionId: 'session-2', + provider: 'opencode', + providerModel: 'openai/gpt-4.1', + movement: 'implement', + movementType: 'normal', + }, + { + success: true, + usage: { + usageMissing: true, + reason: 'invalid_reason', + } as ProviderUsageSnapshot, + }, + ), + ).toThrow('[usage-events] reason is invalid'); + + expect(() => + buildUsageEventRecord( + { + runId: 'run-3', + sessionId: 'session-3', + provider: 'opencode', + providerModel: 'openai/gpt-4.1', + movement: 'implement', + movementType: 'normal', + }, + { + success: true, + usage: { + usageMissing: true, + reason: USAGE_MISSING_REASONS.NOT_AVAILABLE, + }, + }, + ), + ).not.toThrow(); + }); +}); diff --git a/src/__tests__/models.test.ts b/src/__tests__/models.test.ts index 471470b..a2b8cc9 100644 --- a/src/__tests__/models.test.ts +++ b/src/__tests__/models.test.ts @@ -614,11 +614,13 @@ describe('GlobalConfigSchema', () => { const config = { logging: { provider_events: false, + usage_events: true, }, }; const result = GlobalConfigSchema.parse(config); expect(result.logging?.provider_events).toBe(false); + expect(result.logging?.usage_events).toBe(true); }); it('should accept full logging config with all fields', () => { @@ -628,6 +630,7 @@ describe('GlobalConfigSchema', () => { trace: true, debug: true, provider_events: true, + usage_events: false, }, }; @@ -636,6 +639,7 @@ describe('GlobalConfigSchema', () => { expect(result.logging?.trace).toBe(true); expect(result.logging?.debug).toBe(true); expect(result.logging?.provider_events).toBe(true); + expect(result.logging?.usage_events).toBe(false); }); it('should accept partial logging config', () => { @@ -650,6 +654,7 @@ describe('GlobalConfigSchema', () => { expect(result.logging?.trace).toBeUndefined(); expect(result.logging?.debug).toBeUndefined(); expect(result.logging?.provider_events).toBeUndefined(); + expect(result.logging?.usage_events).toBeUndefined(); }); it('should reject invalid logging level', () => { diff --git a/src/__tests__/pieceExecution-session-loading.test.ts b/src/__tests__/pieceExecution-session-loading.test.ts index d5997e5..98dd9a8 100644 --- a/src/__tests__/pieceExecution-session-loading.test.ts +++ b/src/__tests__/pieceExecution-session-loading.test.ts @@ -6,14 +6,45 @@ */ import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { USAGE_MISSING_REASONS } from '../core/logging/contracts.js'; import type { PieceConfig } from '../core/models/index.js'; -const { MockPieceEngine, mockLoadPersonaSessions, mockLoadWorktreeSessions } = vi.hoisted(() => { +const { + MockPieceEngine, + mockLoadPersonaSessions, + mockLoadWorktreeSessions, + mockCreateUsageEventLogger, + mockUsageLogger, + mockMovementResponse, +} = vi.hoisted(() => { // eslint-disable-next-line @typescript-eslint/no-require-imports const { EventEmitter: EE } = require('node:events') as typeof import('node:events'); const mockLoadPersonaSessions = vi.fn().mockReturnValue({ coder: 'saved-session-id' }); const mockLoadWorktreeSessions = vi.fn().mockReturnValue({ coder: 'worktree-session-id' }); + const mockUsageLogger = { + filepath: '/tmp/test-usage-events.jsonl', + setMovement: vi.fn(), + setProvider: vi.fn(), + logUsage: vi.fn(), + }; + const mockCreateUsageEventLogger = vi.fn().mockReturnValue(mockUsageLogger); + const mockMovementResponse: { + providerUsage: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + usageMissing: boolean; + reason?: string; + } | undefined; + } = { + providerUsage: { + inputTokens: 3, + outputTokens: 2, + totalTokens: 5, + usageMissing: false, + }, + }; type PersonaProviderMap = Record; @@ -47,13 +78,28 @@ const { MockPieceEngine, mockLoadPersonaSessions, mockLoadWorktreeSessions } = v if (firstStep) { const providerInfo = resolveProviderInfo(firstStep, this.receivedOptions); this.emit('movement:start', firstStep, 1, firstStep.instructionTemplate, providerInfo); + this.emit('movement:complete', firstStep, { + persona: firstStep.personaDisplayName, + status: 'done', + content: 'ok', + timestamp: new Date('2026-03-04T00:00:00.000Z'), + sessionId: 'movement-session', + providerUsage: mockMovementResponse.providerUsage, + }, firstStep.instructionTemplate); } this.emit('piece:complete', { status: 'completed', iteration: 1 }); return { status: 'completed', iteration: 1 }; } } - return { MockPieceEngine, mockLoadPersonaSessions, mockLoadWorktreeSessions }; + return { + MockPieceEngine, + mockLoadPersonaSessions, + mockLoadWorktreeSessions, + mockCreateUsageEventLogger, + mockUsageLogger, + mockMovementResponse, + }; }); vi.mock('../core/piece/index.js', async () => { @@ -146,6 +192,10 @@ vi.mock('../shared/prompt/index.js', () => ({ selectOption: vi.fn(), promptInput: vi.fn(), })); +vi.mock('../shared/utils/usageEventLogger.js', () => ({ + createUsageEventLogger: mockCreateUsageEventLogger, + isUsageEventsEnabled: vi.fn().mockReturnValue(true), +})); vi.mock('../shared/i18n/index.js', () => ({ getLabel: vi.fn().mockImplementation((key: string) => key), @@ -188,12 +238,30 @@ function makeConfig(): PieceConfig { }; } +function makeConfigWithMovement(overrides: Record): PieceConfig { + const baseMovement = makeConfig().movements[0]; + if (!baseMovement) { + throw new Error('Base movement is required'); + } + return { + ...makeConfig(), + movements: [{ ...baseMovement, ...overrides }], + }; +} + describe('executePiece session loading', () => { beforeEach(() => { vi.clearAllMocks(); + mockCreateUsageEventLogger.mockReturnValue(mockUsageLogger); vi.mocked(resolvePieceConfigValues).mockReturnValue({ ...defaultResolvedConfigValues }); mockLoadPersonaSessions.mockReturnValue({ coder: 'saved-session-id' }); mockLoadWorktreeSessions.mockReturnValue({ coder: 'worktree-session-id' }); + mockMovementResponse.providerUsage = { + inputTokens: 3, + outputTokens: 2, + totalTokens: 5, + usageMissing: false, + }; }); it('should pass empty initialSessions on normal run', async () => { @@ -208,6 +276,41 @@ describe('executePiece session loading', () => { expect(MockPieceEngine.lastInstance.receivedOptions.initialSessions).toEqual({}); }); + it('should log usage events on movement completion when usage logging is enabled', async () => { + await executePiece(makeConfig(), 'task', '/tmp/project', { + projectCwd: '/tmp/project', + }); + + expect(mockCreateUsageEventLogger).toHaveBeenCalledOnce(); + expect(mockUsageLogger.setMovement).toHaveBeenCalledWith('implement', 'normal'); + expect(mockUsageLogger.setProvider).toHaveBeenCalledWith('claude', '(default)'); + expect(mockUsageLogger.logUsage).toHaveBeenCalledWith({ + success: true, + usage: { + inputTokens: 3, + outputTokens: 2, + totalTokens: 5, + usageMissing: false, + }, + }); + }); + + it('should log usage_missing reason when provider usage is unavailable', async () => { + mockMovementResponse.providerUsage = undefined; + + await executePiece(makeConfig(), 'task', '/tmp/project', { + projectCwd: '/tmp/project', + }); + + expect(mockUsageLogger.logUsage).toHaveBeenCalledWith({ + success: true, + usage: { + usageMissing: true, + reason: USAGE_MISSING_REASONS.NOT_AVAILABLE, + }, + }); + }); + it('should load persisted sessions when startMovement is set (retry)', async () => { // Given: retry execution with startMovement await executePiece(makeConfig(), 'task', '/tmp/project', { @@ -314,4 +417,33 @@ describe('executePiece session loading', () => { expect(mockInfo).toHaveBeenCalledWith('Provider: opencode'); expect(mockInfo).toHaveBeenCalledWith('Model: gpt-5'); }); + + it('should pass movement type to usage logger for parallel movement', async () => { + await executePiece(makeConfigWithMovement({ parallel: { branches: [] } }), 'task', '/tmp/project', { + projectCwd: '/tmp/project', + }); + + expect(mockUsageLogger.setMovement).toHaveBeenCalledWith('implement', 'parallel'); + }); + + it('should pass movement type to usage logger for arpeggio movement', async () => { + await executePiece(makeConfigWithMovement({ arpeggio: { source: './items.csv' } }), 'task', '/tmp/project', { + projectCwd: '/tmp/project', + }); + + expect(mockUsageLogger.setMovement).toHaveBeenCalledWith('implement', 'arpeggio'); + }); + + it('should pass movement type to usage logger for team leader movement', async () => { + await executePiece( + makeConfigWithMovement({ teamLeader: { output: { mode: 'summary' } } }), + 'task', + '/tmp/project', + { + projectCwd: '/tmp/project', + }, + ); + + expect(mockUsageLogger.setMovement).toHaveBeenCalledWith('implement', 'team_leader'); + }); }); diff --git a/src/__tests__/providerEventLogger.test.ts b/src/__tests__/providerEventLogger.test.ts index 89a0e9a..a2701cf 100644 --- a/src/__tests__/providerEventLogger.test.ts +++ b/src/__tests__/providerEventLogger.test.ts @@ -94,6 +94,20 @@ describe('providerEventLogger', () => { expect(parsed.data['tool']).toBe('Read'); }); + it('should keep provider-events filename suffix for backward compatibility', () => { + const logger = createProviderEventLogger({ + logsDir: tempDir, + sessionId: 'session-compat', + runId: 'run-compat', + provider: 'claude', + movement: 'plan', + enabled: true, + }); + + expect(logger.filepath.endsWith('-provider-events.jsonl')).toBe(true); + expect(logger.filepath.endsWith('-usage-events.jsonl')).toBe(false); + }); + it('should update movement and provider for subsequent events', () => { const logger = createProviderEventLogger({ logsDir: tempDir, diff --git a/src/__tests__/runSessionReader.test.ts b/src/__tests__/runSessionReader.test.ts index b8aedc5..d7284b8 100644 --- a/src/__tests__/runSessionReader.test.ts +++ b/src/__tests__/runSessionReader.test.ts @@ -318,6 +318,26 @@ describe('loadRunSessionContext', () => { expect(context.movementLogs).toEqual([]); }); + it('should exclude usage-events log files', () => { + const slug = 'usage-events-run'; + const runDir = createRunDir(tmpDir, slug, { + task: 'Usage events test', + piece: 'default', + status: 'completed', + startTime: '2026-02-01T00:00:00.000Z', + logsDirectory: `.takt/runs/${slug}/logs`, + reportDirectory: `.takt/runs/${slug}/reports`, + runSlug: slug, + }); + + // Only usage-events log file + writeFileSync(join(runDir, 'logs', 'session-001-usage-events.jsonl'), '{}', 'utf-8'); + + const context = loadRunSessionContext(tmpDir, slug); + expect(mockLoadNdjsonLog).not.toHaveBeenCalled(); + expect(context.movementLogs).toEqual([]); + }); + afterEach(() => { rmSync(tmpDir, { recursive: true, force: true }); }); diff --git a/src/__tests__/usageEventLogger.test.ts b/src/__tests__/usageEventLogger.test.ts new file mode 100644 index 0000000..b4d6891 --- /dev/null +++ b/src/__tests__/usageEventLogger.test.ts @@ -0,0 +1,273 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { existsSync, mkdirSync, readFileSync, rmSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; + +type ProviderType = 'claude' | 'codex' | 'opencode'; +type MovementType = 'normal' | 'parallel' | 'arpeggio' | 'team_leader'; + +interface ProviderUsageSnapshot { + readonly inputTokens?: number; + readonly outputTokens?: number; + readonly totalTokens?: number; + readonly cachedInputTokens?: number; + readonly usageMissing: boolean; + readonly reason?: string; +} + +interface UsageEventLoggerConfig { + readonly logsDir: string; + readonly sessionId: string; + readonly runId: string; + readonly provider: ProviderType; + readonly providerModel: string; + readonly movement: string; + readonly movementType: MovementType; + readonly enabled: boolean; +} + +interface UsageEventLogger { + readonly filepath: string; + setMovement(movement: string, movementType: MovementType): void; + setProvider(provider: ProviderType, providerModel: string): void; + logUsage(params: { + readonly success: boolean; + readonly usage: ProviderUsageSnapshot; + readonly timestamp?: Date; + }): void; +} + +interface UsageEventLoggerModule { + createUsageEventLogger(config: UsageEventLoggerConfig): UsageEventLogger; + isUsageEventsEnabled(config?: { logging?: { usageEvents?: boolean } }): boolean; +} + +const USAGE_EVENT_LOGGER_MODULE_PATH = ['..', 'shared', 'utils', 'usageEventLogger.js'].join('/'); + +async function loadUsageEventLoggerModule(): Promise { + return (await import(USAGE_EVENT_LOGGER_MODULE_PATH)) as UsageEventLoggerModule; +} + +describe('usageEventLogger', () => { + let tempDir: string; + + beforeEach(() => { + tempDir = join(tmpdir(), `takt-usage-events-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`); + mkdirSync(tempDir, { recursive: true }); + }); + + afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); + }); + + it('should disable usage events by default', async () => { + const { isUsageEventsEnabled } = await loadUsageEventLoggerModule(); + + expect(isUsageEventsEnabled()).toBe(false); + expect(isUsageEventsEnabled({})).toBe(false); + expect(isUsageEventsEnabled({ logging: {} })).toBe(false); + }); + + it('should enable usage events only when explicitly true', async () => { + const { isUsageEventsEnabled } = await loadUsageEventLoggerModule(); + + expect(isUsageEventsEnabled({ logging: { usageEvents: true } })).toBe(true); + expect(isUsageEventsEnabled({ logging: { usageEvents: false } })).toBe(false); + }); + + it('should write usage event records with required fields', async () => { + const { createUsageEventLogger } = await loadUsageEventLoggerModule(); + const logger = createUsageEventLogger({ + logsDir: tempDir, + sessionId: 'session-1', + runId: 'run-1', + provider: 'codex', + providerModel: 'gpt-5-codex', + movement: 'implement', + movementType: 'normal', + enabled: true, + }); + + logger.logUsage({ + success: true, + usage: { + inputTokens: 12, + outputTokens: 8, + totalTokens: 20, + cachedInputTokens: 4, + usageMissing: false, + }, + timestamp: new Date('2026-03-04T12:00:00.000Z'), + }); + + expect(existsSync(logger.filepath)).toBe(true); + + const line = readFileSync(logger.filepath, 'utf-8').trim(); + const parsed = JSON.parse(line) as { + run_id: string; + session_id: string; + provider: ProviderType; + provider_model: string; + movement: string; + movement_type: MovementType; + timestamp: string; + success: boolean; + usage_missing: boolean; + reason?: string; + usage: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cached_input_tokens?: number; + }; + }; + + expect(parsed.run_id).toBe('run-1'); + expect(parsed.session_id).toBe('session-1'); + expect(parsed.provider).toBe('codex'); + expect(parsed.provider_model).toBe('gpt-5-codex'); + expect(parsed.movement).toBe('implement'); + expect(parsed.movement_type).toBe('normal'); + expect(parsed.success).toBe(true); + expect(parsed.usage_missing).toBe(false); + expect(parsed.timestamp).toBe('2026-03-04T12:00:00.000Z'); + expect(parsed.usage.input_tokens).toBe(12); + expect(parsed.usage.output_tokens).toBe(8); + expect(parsed.usage.total_tokens).toBe(20); + expect(parsed.usage.cached_input_tokens).toBe(4); + }); + + it('should write usage_missing and reason when provider usage is unavailable', async () => { + const { createUsageEventLogger } = await loadUsageEventLoggerModule(); + const logger = createUsageEventLogger({ + logsDir: tempDir, + sessionId: 'session-2', + runId: 'run-2', + provider: 'opencode', + providerModel: 'openai/gpt-4.1', + movement: 'implement', + movementType: 'normal', + enabled: true, + }); + + logger.logUsage({ + success: true, + usage: { + usageMissing: true, + reason: 'usage_not_supported_by_provider', + }, + }); + + const line = readFileSync(logger.filepath, 'utf-8').trim(); + const parsed = JSON.parse(line) as { + provider: ProviderType; + usage_missing: boolean; + reason?: string; + usage: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cached_input_tokens?: number; + }; + }; + + expect(parsed.provider).toBe('opencode'); + expect(parsed.usage_missing).toBe(true); + expect(parsed.reason).toBe('usage_not_supported_by_provider'); + expect(parsed.usage).toEqual({}); + }); + + it('should update movement and provider metadata for subsequent records', async () => { + const { createUsageEventLogger } = await loadUsageEventLoggerModule(); + const logger = createUsageEventLogger({ + logsDir: tempDir, + sessionId: 'session-3', + runId: 'run-3', + provider: 'claude', + providerModel: 'sonnet', + movement: 'plan', + movementType: 'normal', + enabled: true, + }); + + logger.logUsage({ + success: true, + usage: { inputTokens: 1, outputTokens: 2, totalTokens: 3, usageMissing: false }, + }); + + logger.setMovement('implement', 'parallel'); + logger.setProvider('codex', 'gpt-5-codex'); + logger.logUsage({ + success: true, + usage: { inputTokens: 4, outputTokens: 5, totalTokens: 9, usageMissing: false }, + }); + + const lines = readFileSync(logger.filepath, 'utf-8').trim().split('\n'); + expect(lines).toHaveLength(2); + + const first = JSON.parse(lines[0] ?? '{}') as { provider: ProviderType; provider_model: string; movement: string; movement_type: MovementType }; + const second = JSON.parse(lines[1] ?? '{}') as { provider: ProviderType; provider_model: string; movement: string; movement_type: MovementType }; + + expect(first.provider).toBe('claude'); + expect(first.provider_model).toBe('sonnet'); + expect(first.movement).toBe('plan'); + expect(first.movement_type).toBe('normal'); + + expect(second.provider).toBe('codex'); + expect(second.provider_model).toBe('gpt-5-codex'); + expect(second.movement).toBe('implement'); + expect(second.movement_type).toBe('parallel'); + }); + + it('should not write records when disabled', async () => { + const { createUsageEventLogger } = await loadUsageEventLoggerModule(); + const logger = createUsageEventLogger({ + logsDir: tempDir, + sessionId: 'session-disabled', + runId: 'run-disabled', + provider: 'claude', + providerModel: 'sonnet', + movement: 'plan', + movementType: 'normal', + enabled: false, + }); + + logger.logUsage({ + success: true, + usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2, usageMissing: false }, + }); + + expect(existsSync(logger.filepath)).toBe(false); + }); + + it('should report file write failures to stderr only once', async () => { + const { createUsageEventLogger } = await loadUsageEventLoggerModule(); + const logger = createUsageEventLogger({ + logsDir: join(tempDir, 'missing', 'nested'), + sessionId: 'session-err', + runId: 'run-err', + provider: 'claude', + providerModel: 'sonnet', + movement: 'plan', + movementType: 'normal', + enabled: true, + }); + + const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation(() => true); + try { + logger.logUsage({ + success: true, + usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2, usageMissing: false }, + }); + logger.logUsage({ + success: true, + usage: { inputTokens: 2, outputTokens: 2, totalTokens: 4, usageMissing: false }, + }); + + expect(stderrSpy).toHaveBeenCalledTimes(1); + expect(stderrSpy.mock.calls[0]?.[0]).toContain('Failed to write usage event log'); + } finally { + stderrSpy.mockRestore(); + } + }); +}); diff --git a/src/core/logging/contracts.ts b/src/core/logging/contracts.ts new file mode 100644 index 0000000..6e6f119 --- /dev/null +++ b/src/core/logging/contracts.ts @@ -0,0 +1,11 @@ +export const PROVIDER_EVENTS_LOG_FILE_SUFFIX = '-provider-events.jsonl'; +export const USAGE_EVENTS_LOG_FILE_SUFFIX = '-usage-events.jsonl'; + +export const USAGE_MISSING_REASONS = { + NOT_AVAILABLE: 'usage_not_available', + TOKENS_MISSING: 'usage_tokens_missing', + NOT_SUPPORTED_BY_PROVIDER: 'usage_not_supported_by_provider', +} as const; + +export type UsageMissingReason = + (typeof USAGE_MISSING_REASONS)[keyof typeof USAGE_MISSING_REASONS]; diff --git a/src/core/logging/providerEvent.ts b/src/core/logging/providerEvent.ts new file mode 100644 index 0000000..467769b --- /dev/null +++ b/src/core/logging/providerEvent.ts @@ -0,0 +1,176 @@ +import type { ProviderType, StreamEvent } from '../../shared/types/provider.js'; +import type { ProviderUsageSnapshot } from '../models/response.js'; +import { USAGE_MISSING_REASONS, type UsageMissingReason } from './contracts.js'; + +export type MovementType = 'normal' | 'parallel' | 'arpeggio' | 'team_leader'; + +export interface ProviderEventLogRecord { + timestamp: string; + provider: ProviderType; + event_type: string; + run_id: string; + movement: string; + session_id?: string; + message_id?: string; + call_id?: string; + request_id?: string; + data: Record; +} + +export interface UsageEventLogRecord { + run_id: string; + session_id: string; + provider: ProviderType; + provider_model: string; + movement: string; + movement_type: MovementType; + timestamp: string; + success: boolean; + usage_missing: boolean; + reason?: UsageMissingReason; + usage: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cached_input_tokens?: number; + }; +} + +interface UsageEventMeta { + runId: string; + sessionId: string; + provider: ProviderType; + providerModel: string; + movement: string; + movementType: MovementType; +} + +interface BuildUsageRecordParams { + success: boolean; + usage: ProviderUsageSnapshot; + timestamp?: Date; +} + +const MAX_TEXT_LENGTH = 10_000; +const HEAD_LENGTH = 5_000; +const TAIL_LENGTH = 2_000; +const TRUNCATED_MARKER = '...[truncated]'; + +function truncateString(value: string): string { + if (value.length <= MAX_TEXT_LENGTH) { + return value; + } + return value.slice(0, HEAD_LENGTH) + TRUNCATED_MARKER + value.slice(-TAIL_LENGTH); +} + +function sanitizeData(data: Record): Record { + return Object.fromEntries( + Object.entries(data).map(([key, value]) => { + if (typeof value === 'string') { + return [key, truncateString(value)]; + } + return [key, value]; + }) + ); +} + +function pickString(source: Record, keys: string[]): string | undefined { + for (const key of keys) { + const value = source[key]; + if (typeof value === 'string' && value.length > 0) { + return value; + } + } + return undefined; +} + +function assertFiniteNumber(value: number | undefined, field: string): void { + if (!Number.isFinite(value)) { + throw new Error(`[usage-events] ${field} is required`); + } +} + +function assertUsageMissingReason(value: string): UsageMissingReason { + for (const reason of Object.values(USAGE_MISSING_REASONS)) { + if (value === reason) { + return value; + } + } + throw new Error('[usage-events] reason is invalid'); +} + +export function normalizeProviderEvent( + event: StreamEvent, + provider: ProviderType, + movement: string, + runId: string +): ProviderEventLogRecord { + const data = sanitizeData(event.data as unknown as Record); + const sessionId = pickString(data, ['session_id', 'sessionId', 'sessionID', 'thread_id', 'threadId']); + const messageId = pickString(data, ['message_id', 'messageId', 'item_id', 'itemId']); + const callId = pickString(data, ['call_id', 'callId', 'id']); + const requestId = pickString(data, ['request_id', 'requestId']); + + return { + timestamp: new Date().toISOString(), + provider, + event_type: event.type, + run_id: runId, + movement, + ...(sessionId ? { session_id: sessionId } : {}), + ...(messageId ? { message_id: messageId } : {}), + ...(callId ? { call_id: callId } : {}), + ...(requestId ? { request_id: requestId } : {}), + data, + }; +} + +export function buildUsageEventRecord( + meta: UsageEventMeta, + params: BuildUsageRecordParams +): UsageEventLogRecord { + if (params.usage.usageMissing) { + if (typeof params.usage.reason !== 'string' || params.usage.reason.length === 0) { + throw new Error('[usage-events] reason is required when usageMissing=true'); + } + return { + run_id: meta.runId, + session_id: meta.sessionId, + provider: meta.provider, + provider_model: meta.providerModel, + movement: meta.movement, + movement_type: meta.movementType, + timestamp: (params.timestamp ?? new Date()).toISOString(), + success: params.success, + usage_missing: true, + reason: assertUsageMissingReason(params.usage.reason), + usage: {}, + }; + } + + assertFiniteNumber(params.usage.inputTokens, 'usage.inputTokens'); + assertFiniteNumber(params.usage.outputTokens, 'usage.outputTokens'); + assertFiniteNumber(params.usage.totalTokens, 'usage.totalTokens'); + + const usage = { + input_tokens: params.usage.inputTokens, + output_tokens: params.usage.outputTokens, + total_tokens: params.usage.totalTokens, + ...(Number.isFinite(params.usage.cachedInputTokens) + ? { cached_input_tokens: params.usage.cachedInputTokens } + : {}), + }; + + return { + run_id: meta.runId, + session_id: meta.sessionId, + provider: meta.provider, + provider_model: meta.providerModel, + movement: meta.movement, + movement_type: meta.movementType, + timestamp: (params.timestamp ?? new Date()).toISOString(), + success: params.success, + usage_missing: false, + usage, + }; +} diff --git a/src/core/logging/providerEventLogger.ts b/src/core/logging/providerEventLogger.ts new file mode 100644 index 0000000..8c1226f --- /dev/null +++ b/src/core/logging/providerEventLogger.ts @@ -0,0 +1,87 @@ +import { appendFileSync } from 'node:fs'; +import { join } from 'node:path'; +import type { ProviderType, StreamCallback, StreamEvent } from '../../shared/types/provider.js'; +import { PROVIDER_EVENTS_LOG_FILE_SUFFIX } from './contracts.js'; +import { normalizeProviderEvent } from './providerEvent.js'; + +export interface ProviderEventLoggerConfig { + logsDir: string; + sessionId: string; + runId: string; + provider: ProviderType; + movement: string; + enabled: boolean; +} + +export interface ProviderEventLogger { + readonly filepath: string; + setMovement(movement: string): void; + setProvider(provider: ProviderType): void; + wrapCallback(original?: StreamCallback): StreamCallback; +} + +function assertNonEmpty(value: string, field: string): void { + if (value.length === 0) { + throw new Error(`[provider-events] ${field} is required`); + } +} + +export function createProviderEventLogger(config: ProviderEventLoggerConfig): ProviderEventLogger { + if (config.enabled) { + assertNonEmpty(config.logsDir, 'logsDir'); + assertNonEmpty(config.sessionId, 'sessionId'); + assertNonEmpty(config.runId, 'runId'); + assertNonEmpty(config.movement, 'movement'); + } + + const filepath = join(config.logsDir, `${config.sessionId}${PROVIDER_EVENTS_LOG_FILE_SUFFIX}`); + let movement = config.movement; + let provider = config.provider; + let hasReportedWriteFailure = false; + + const write = (event: StreamEvent): void => { + const record = normalizeProviderEvent(event, provider, movement, config.runId); + try { + appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8'); + } catch (error) { + if (hasReportedWriteFailure) { + return; + } + hasReportedWriteFailure = true; + const message = error instanceof Error ? error.message : String(error); + process.stderr.write(`[takt] Failed to write provider event log: ${message}\n`); + } + }; + + return { + filepath, + setMovement(nextMovement: string): void { + assertNonEmpty(nextMovement, 'movement'); + movement = nextMovement; + }, + setProvider(nextProvider: ProviderType): void { + provider = nextProvider; + }, + wrapCallback(original?: StreamCallback): StreamCallback { + if (!config.enabled && original) { + return original; + } + if (!config.enabled) { + return () => {}; + } + + return (event: StreamEvent): void => { + write(event); + original?.(event); + }; + }, + }; +} + +export function isProviderEventsEnabled(config?: { + logging?: { + providerEvents?: boolean; + }; +}): boolean { + return config?.logging?.providerEvents === true; +} diff --git a/src/core/logging/usageEventLogger.ts b/src/core/logging/usageEventLogger.ts new file mode 100644 index 0000000..fbefe08 --- /dev/null +++ b/src/core/logging/usageEventLogger.ts @@ -0,0 +1,108 @@ +import { appendFileSync } from 'node:fs'; +import { join } from 'node:path'; +import type { ProviderType } from '../../shared/types/provider.js'; +import type { ProviderUsageSnapshot } from '../models/response.js'; +import { USAGE_EVENTS_LOG_FILE_SUFFIX } from './contracts.js'; +import { + buildUsageEventRecord, + type MovementType, +} from './providerEvent.js'; + +export interface UsageEventLoggerConfig { + readonly logsDir: string; + readonly sessionId: string; + readonly runId: string; + readonly provider: ProviderType; + readonly providerModel: string; + readonly movement: string; + readonly movementType: MovementType; + readonly enabled: boolean; +} + +export interface UsageEventLogger { + readonly filepath: string; + setMovement(movement: string, movementType: MovementType): void; + setProvider(provider: ProviderType, providerModel: string): void; + logUsage(params: { + readonly success: boolean; + readonly usage: ProviderUsageSnapshot; + readonly timestamp?: Date; + }): void; +} + +function assertNonEmpty(value: string, field: string): void { + if (value.length === 0) { + throw new Error(`[usage-events] ${field} is required`); + } +} + +export function createUsageEventLogger(config: UsageEventLoggerConfig): UsageEventLogger { + if (config.enabled) { + assertNonEmpty(config.logsDir, 'logsDir'); + assertNonEmpty(config.sessionId, 'sessionId'); + assertNonEmpty(config.runId, 'runId'); + assertNonEmpty(config.providerModel, 'providerModel'); + assertNonEmpty(config.movement, 'movement'); + } + + const filepath = join(config.logsDir, `${config.sessionId}${USAGE_EVENTS_LOG_FILE_SUFFIX}`); + let movement = config.movement; + let movementType = config.movementType; + let provider = config.provider; + let providerModel = config.providerModel; + let hasReportedWriteFailure = false; + + return { + filepath, + setMovement(nextMovement: string, nextMovementType: MovementType): void { + assertNonEmpty(nextMovement, 'movement'); + movement = nextMovement; + movementType = nextMovementType; + }, + setProvider(nextProvider: ProviderType, nextProviderModel: string): void { + assertNonEmpty(nextProviderModel, 'providerModel'); + provider = nextProvider; + providerModel = nextProviderModel; + }, + logUsage(params: { + readonly success: boolean; + readonly usage: ProviderUsageSnapshot; + readonly timestamp?: Date; + }): void { + if (!config.enabled) { + return; + } + + const record = buildUsageEventRecord( + { + runId: config.runId, + sessionId: config.sessionId, + provider, + providerModel, + movement, + movementType, + }, + params + ); + + try { + appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8'); + } catch (error) { + if (hasReportedWriteFailure) { + return; + } + hasReportedWriteFailure = true; + const message = error instanceof Error ? error.message : String(error); + process.stderr.write(`[takt] Failed to write usage event log: ${message}\n`); + } + }, + }; +} + +export function isUsageEventsEnabled(config?: { + logging?: { + usageEvents?: boolean; + }; +}): boolean { + return config?.logging?.usageEvents === true; +} diff --git a/src/core/models/index.ts b/src/core/models/index.ts index aca3c11..9d6cceb 100644 --- a/src/core/models/index.ts +++ b/src/core/models/index.ts @@ -11,6 +11,7 @@ export type { RuntimePrepareEntry, PieceRuntimeConfig, AgentResponse, + ProviderUsageSnapshot, SessionState, PartDefinition, PartResult, diff --git a/src/core/models/persisted-global-config.ts b/src/core/models/persisted-global-config.ts index 5d242e4..899762d 100644 --- a/src/core/models/persisted-global-config.ts +++ b/src/core/models/persisted-global-config.ts @@ -45,6 +45,8 @@ export interface LoggingConfig { debug?: boolean; /** Enable provider stream event logging (default: false when undefined) */ providerEvents?: boolean; + /** Enable usage event logging (default: false when undefined) */ + usageEvents?: boolean; } /** Analytics configuration for local metrics collection */ diff --git a/src/core/models/response.ts b/src/core/models/response.ts index 532584c..a603cf2 100644 --- a/src/core/models/response.ts +++ b/src/core/models/response.ts @@ -4,6 +4,17 @@ import type { Status, RuleMatchMethod } from './status.js'; +export interface ProviderUsageSnapshot { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + cacheCreationInputTokens?: number; + cacheReadInputTokens?: number; + usageMissing: boolean; + reason?: string; +} + /** Response from an agent execution */ export interface AgentResponse { persona: string; @@ -19,4 +30,6 @@ export interface AgentResponse { matchedRuleMethod?: RuleMatchMethod; /** Structured output returned by provider SDK (JSON Schema mode) */ structuredOutput?: Record; + /** Provider-native usage payload normalized for TAKT observability */ + providerUsage?: ProviderUsageSnapshot; } diff --git a/src/core/models/schemas.ts b/src/core/models/schemas.ts index b26b901..de505d2 100644 --- a/src/core/models/schemas.ts +++ b/src/core/models/schemas.ts @@ -466,6 +466,7 @@ export const LoggingConfigSchema = z.object({ trace: z.boolean().optional(), debug: z.boolean().optional(), provider_events: z.boolean().optional(), + usage_events: z.boolean().optional(), }); /** Analytics config schema */ diff --git a/src/core/models/types.ts b/src/core/models/types.ts index 00ec737..50eb559 100644 --- a/src/core/models/types.ts +++ b/src/core/models/types.ts @@ -16,6 +16,7 @@ export type { // Agent response export type { AgentResponse, + ProviderUsageSnapshot, } from './response.js'; // Session state (authoritative definition with createSessionState) diff --git a/src/features/interactive/runSessionReader.ts b/src/features/interactive/runSessionReader.ts index da4b4d1..8f1b045 100644 --- a/src/features/interactive/runSessionReader.ts +++ b/src/features/interactive/runSessionReader.ts @@ -7,6 +7,10 @@ import { existsSync, readdirSync, readFileSync } from 'node:fs'; import { join } from 'node:path'; +import { + PROVIDER_EVENTS_LOG_FILE_SUFFIX, + USAGE_EVENTS_LOG_FILE_SUFFIX, +} from '../../core/logging/contracts.js'; import { loadNdjsonLog } from '../../infra/fs/index.js'; import type { SessionLog } from '../../shared/utils/index.js'; @@ -113,7 +117,11 @@ function findSessionLogFile(logsDir: string): string | null { } const files = readdirSync(logsDir).filter( - (f) => f.endsWith('.jsonl') && !f.includes('-provider-events'), + (f) => ( + f.endsWith('.jsonl') + && !f.endsWith(PROVIDER_EVENTS_LOG_FILE_SUFFIX) + && !f.endsWith(USAGE_EVENTS_LOG_FILE_SUFFIX) + ), ); const first = files[0]; diff --git a/src/features/tasks/execute/pieceExecution.ts b/src/features/tasks/execute/pieceExecution.ts index c9577af..fe5a8e1 100644 --- a/src/features/tasks/execute/pieceExecution.ts +++ b/src/features/tasks/execute/pieceExecution.ts @@ -13,6 +13,8 @@ import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js'; import { generateSessionId, createSessionLog, finalizeSessionLog, initNdjsonLog } from '../../../infra/fs/index.js'; import { createLogger, notifySuccess, notifyError, preventSleep, generateReportDir, isValidReportDirName, getDebugPromptsLogFile } from '../../../shared/utils/index.js'; import { createProviderEventLogger, isProviderEventsEnabled } from '../../../shared/utils/providerEventLogger.js'; +import { createUsageEventLogger, isUsageEventsEnabled } from '../../../shared/utils/usageEventLogger.js'; +import { USAGE_MISSING_REASONS } from '../../../core/logging/contracts.js'; import { getLabel } from '../../../shared/i18n/index.js'; import { buildRunPaths } from '../../../core/piece/run/run-paths.js'; import { resolveRuntimeConfig } from '../../../core/runtime/runtime-environment.js'; @@ -24,7 +26,7 @@ import { AnalyticsEmitter } from './analyticsEmitter.js'; import { createOutputFns, createPrefixedStreamHandler } from './outputFns.js'; import { RunMetaManager } from './runMeta.js'; import { createIterationLimitHandler, createUserInputHandler } from './iterationLimitHandler.js'; -import { assertTaskPrefixPair, truncate, formatElapsedTime } from './pieceExecutionUtils.js'; +import { assertTaskPrefixPair, truncate, formatElapsedTime, detectMovementType } from './pieceExecutionUtils.js'; import { createTraceReportWriter } from './traceReportWriter.js'; import { sanitizeTextForStorage } from './traceReportRedaction.js'; export type { PieceExecutionResult, PieceExecutionOptions }; @@ -46,7 +48,6 @@ export async function executePiece( const isRetry = Boolean(options.startMovement || options.retryNote); log.debug('Session mode', { isRetry, isWorktree: cwd !== projectCwd }); out.header(`${headerPrefix} ${pieceConfig.name}`); - const pieceSessionId = generateSessionId(); const runSlug = options.reportDirName ?? generateReportDir(task); if (!isValidReportDirName(runSlug)) throw new Error(`Invalid reportDirName: ${runSlug}`); @@ -61,10 +62,7 @@ export async function executePiece( displayRef.current.createHandler()(event); }; const isWorktree = cwd !== projectCwd; - const globalConfig = resolvePieceConfigValues( - projectCwd, - ['notificationSound', 'notificationSoundEvents', 'provider', 'runtime', 'preventSleep', 'model', 'logging', 'analytics'], - ); + const globalConfig = resolvePieceConfigValues(projectCwd, ['notificationSound', 'notificationSoundEvents', 'provider', 'runtime', 'preventSleep', 'model', 'logging', 'analytics']); const traceReportMode = globalConfig.logging?.trace === true ? 'full' : 'redacted'; const allowSensitiveData = traceReportMode === 'full'; const ndjsonLogPath = initNdjsonLog( @@ -74,9 +72,7 @@ export async function executePiece( { logsDir: runPaths.logsAbs }, ); const sessionLogger = new SessionLogger(ndjsonLogPath, allowSensitiveData); - if (options.interactiveMetadata) { - sessionLogger.writeInteractiveMetadata(options.interactiveMetadata); - } + if (options.interactiveMetadata) sessionLogger.writeInteractiveMetadata(options.interactiveMetadata); const shouldNotify = globalConfig.notificationSound !== false; const nse = globalConfig.notificationSoundEvents; const shouldNotifyIterationLimit = shouldNotify && nse?.iterationLimit !== false; @@ -98,6 +94,16 @@ export async function executePiece( movement: options.startMovement ?? pieceConfig.initialMovement, enabled: isProviderEventsEnabled(globalConfig), }); + const usageEventLogger = createUsageEventLogger({ + logsDir: runPaths.logsAbs, + sessionId: pieceSessionId, + runId: runSlug, + provider: currentProvider, + providerModel: configuredModel ?? '(default)', + movement: options.startMovement ?? pieceConfig.initialMovement, + movementType: 'normal', + enabled: isUsageEventsEnabled(globalConfig), + }); initAnalyticsWriter(globalConfig.analytics?.enabled === true, globalConfig.analytics?.eventsPath ?? join(getGlobalConfigDir(), 'analytics', 'events')); if (globalConfig.preventSleep) preventSleep(); const analyticsEmitter = new AnalyticsEmitter(runSlug, currentProvider, configuredModel ?? '(default)'); @@ -107,7 +113,6 @@ export async function executePiece( const sessionUpdateHandler = isWorktree ? (personaName: string, personaSessionId: string) => updateWorktreeSession(projectCwd, cwd, personaName, personaSessionId, currentProvider) : (persona: string, personaSessionId: string) => updatePersonaSession(projectCwd, persona, personaSessionId, currentProvider); - const iterationLimitHandler = createIterationLimitHandler( out, displayRef, @@ -121,7 +126,6 @@ export async function executePiece( }, ); const onUserInput = interactiveUserInput ? createUserInputHandler(out, displayRef) : undefined; - let abortReason: string | undefined; let exceededInfo: ExceededInfo | undefined; let lastMovementContent: string | undefined; @@ -174,13 +178,11 @@ export async function executePiece( log.debug('Phase starting', { step: step.name, phase, phaseName }); sessionLogger.onPhaseStart(step, phase, phaseName, instruction, promptParts, phaseExecutionId, iteration); }); - engine.on('phase:complete', (step, phase, phaseName, content, phaseStatus, phaseError, phaseExecutionId, iteration) => { log.debug('Phase completed', { step: step.name, phase, phaseName, status: phaseStatus }); sessionLogger.setIteration(currentIteration); sessionLogger.onPhaseComplete(step, phase, phaseName, content, phaseStatus, phaseError, phaseExecutionId, iteration); }); - engine.on('phase:judge_stage', (step, phase, phaseName, entry, phaseExecutionId, iteration) => { sessionLogger.onJudgeStage(step, phase, phaseName, entry, phaseExecutionId, iteration); }); @@ -195,6 +197,8 @@ export async function executePiece( const movementModel = providerInfo.model ?? (movementProvider === currentProvider ? configuredModel : undefined) ?? '(default)'; providerEventLogger.setMovement(step.name); providerEventLogger.setProvider(movementProvider); + usageEventLogger.setMovement(step.name, detectMovementType(step)); + usageEventLogger.setProvider(movementProvider, movementModel); out.info(`Provider: ${movementProvider}`); out.info(`Model: ${movementModel}`); if (instruction) log.debug('Step instruction', instruction); @@ -210,7 +214,6 @@ export async function executePiece( } sessionLogger.onMovementStart(step, iteration, instruction); }); - engine.on('movement:complete', (step, response, instruction) => { log.debug('Movement completed', { step: step.name, status: response.status, matchedRuleIndex: response.matchedRuleIndex, matchedRuleMethod: response.matchedRuleMethod, contentLength: response.content.length, sessionId: response.sessionId, error: response.error }); lastMovementContent = response.content; @@ -227,17 +230,16 @@ export async function executePiece( } if (response.error) out.error(`Error: ${response.error}`); if (response.sessionId) out.status('Session', response.sessionId); + usageEventLogger.logUsage({ success: response.status === 'done', usage: response.providerUsage ?? { usageMissing: true, reason: USAGE_MISSING_REASONS.NOT_AVAILABLE } }); sessionLogger.onMovementComplete(step, response, instruction); analyticsEmitter.onMovementComplete(step, response); sessionLog = { ...sessionLog, iterations: sessionLog.iterations + 1 }; }); - engine.on('movement:report', (step, filePath, fileName) => { out.logLine(`\n📄 Report: ${fileName}\n`); out.logLine(readFileSync(filePath, 'utf-8')); analyticsEmitter.onMovementReport(step, filePath); }); - engine.on('piece:complete', (state) => { log.info('Piece completed successfully', { iterations: state.iteration }); sessionLog = finalizeSessionLog(sessionLog, 'completed'); @@ -256,7 +258,6 @@ export async function executePiece( out.info(`Session log: ${ndjsonLogPath}`); if (shouldNotifyPieceComplete) notifySuccess('TAKT', getLabel('piece.notifyComplete', undefined, { iteration: String(state.iteration) })); }); - engine.on('piece:abort', (state, reason) => { interruptAllQueries(); log.error('Piece aborted', { reason, iterations: state.iteration }); @@ -280,7 +281,6 @@ export async function executePiece( out.info(`Session log: ${ndjsonLogPath}`); if (shouldNotifyPieceAbort) notifyError('TAKT', getLabel('piece.notifyAbort', undefined, { reason })); }); - const finalState = await engine.run(); return { success: finalState.status === 'completed', diff --git a/src/features/tasks/execute/pieceExecutionUtils.ts b/src/features/tasks/execute/pieceExecutionUtils.ts index 860d5ba..677c538 100644 --- a/src/features/tasks/execute/pieceExecutionUtils.ts +++ b/src/features/tasks/execute/pieceExecutionUtils.ts @@ -18,3 +18,10 @@ export function formatElapsedTime(startTime: string, endTime: string): string { } return `${Math.floor(elapsedSec / 60)}m ${Math.floor(elapsedSec % 60)}s`; } + +export function detectMovementType(step: { parallel?: unknown; arpeggio?: unknown; teamLeader?: unknown }): 'normal' | 'parallel' | 'arpeggio' | 'team_leader' { + if (step.parallel) return 'parallel'; + if (step.arpeggio) return 'arpeggio'; + if (step.teamLeader) return 'team_leader'; + return 'normal'; +} diff --git a/src/infra/claude/client.ts b/src/infra/claude/client.ts index cdb1174..2ca5057 100644 --- a/src/infra/claude/client.ts +++ b/src/infra/claude/client.ts @@ -79,6 +79,7 @@ export class ClaudeClient { sessionId: result.sessionId, error: result.error, structuredOutput: result.structuredOutput, + providerUsage: result.providerUsage, }; } @@ -108,6 +109,7 @@ export class ClaudeClient { sessionId: result.sessionId, error: result.error, structuredOutput: result.structuredOutput, + providerUsage: result.providerUsage, }; } @@ -158,6 +160,7 @@ export class ClaudeClient { sessionId: result.sessionId, error: result.error, structuredOutput: result.structuredOutput, + providerUsage: result.providerUsage, }; } diff --git a/src/infra/claude/executor.ts b/src/infra/claude/executor.ts index 0365dad..285d262 100644 --- a/src/infra/claude/executor.ts +++ b/src/infra/claude/executor.ts @@ -11,6 +11,8 @@ import { type SDKResultMessage, type SDKAssistantMessage, } from '@anthropic-ai/claude-agent-sdk'; +import { USAGE_MISSING_REASONS } from '../../core/logging/contracts.js'; +import type { ProviderUsageSnapshot } from '../../core/models/response.js'; import { createLogger, getErrorMessage } from '../../shared/utils/index.js'; import { generateQueryId, @@ -26,6 +28,55 @@ import type { const log = createLogger('claude-sdk'); +function toNumber(value: unknown): number | undefined { + return typeof value === 'number' && Number.isFinite(value) ? value : undefined; +} + +function extractProviderUsage(resultMsg: SDKResultMessage): ProviderUsageSnapshot { + const rawUsage = (resultMsg as unknown as { usage?: unknown }).usage; + if (!rawUsage || typeof rawUsage !== 'object') { + return { + usageMissing: true, + reason: USAGE_MISSING_REASONS.NOT_AVAILABLE, + }; + } + + const usage = rawUsage as Record; + const inputTokens = toNumber(usage.input_tokens); + const outputTokens = toNumber(usage.output_tokens); + const cacheCreationInputTokens = toNumber(usage.cache_creation_input_tokens); + const cacheReadInputTokens = toNumber(usage.cache_read_input_tokens); + if (inputTokens === undefined || outputTokens === undefined) { + return { + usageMissing: true, + reason: USAGE_MISSING_REASONS.TOKENS_MISSING, + }; + } + const totalTokens = inputTokens + outputTokens; + const cachedInputTokens = ( + cacheCreationInputTokens !== undefined && cacheReadInputTokens !== undefined + ? cacheCreationInputTokens + cacheReadInputTokens + : cacheReadInputTokens ?? cacheCreationInputTokens + ); + + const providerUsage: ProviderUsageSnapshot = { + inputTokens, + outputTokens, + totalTokens, + usageMissing: false, + }; + if (cachedInputTokens !== undefined) { + providerUsage.cachedInputTokens = cachedInputTokens; + } + if (cacheCreationInputTokens !== undefined) { + providerUsage.cacheCreationInputTokens = cacheCreationInputTokens; + } + if (cacheReadInputTokens !== undefined) { + providerUsage.cacheReadInputTokens = cacheReadInputTokens; + } + return providerUsage; +} + /** * Executes Claude queries using the Agent SDK. * @@ -95,6 +146,7 @@ export class QueryExecutor { let hasResultMessage = false; let accumulatedAssistantText = ''; let structuredOutput: Record | undefined; + let providerUsage: ProviderUsageSnapshot | undefined; let onExternalAbort: (() => void) | undefined; try { @@ -138,6 +190,7 @@ export class QueryExecutor { if (message.type === 'result') { hasResultMessage = true; const resultMsg = message as SDKResultMessage; + providerUsage = extractProviderUsage(resultMsg); if (resultMsg.subtype === 'success') { resultContent = resultMsg.result; const rawStructuredOutput = (resultMsg as unknown as { @@ -176,13 +229,18 @@ export class QueryExecutor { hasResultMessage, }); - return { + const response: ClaudeResult = { success, content: finalContent.trim(), sessionId, fullContent: accumulatedAssistantText.trim(), structuredOutput, + providerUsage: providerUsage ?? { + usageMissing: true, + reason: USAGE_MISSING_REASONS.NOT_AVAILABLE, + }, }; + return response; } catch (error) { if (onExternalAbort && options.abortSignal) { options.abortSignal.removeEventListener('abort', onExternalAbort); diff --git a/src/infra/claude/types.ts b/src/infra/claude/types.ts index 69dac84..8e833cb 100644 --- a/src/infra/claude/types.ts +++ b/src/infra/claude/types.ts @@ -7,6 +7,7 @@ import type { PermissionUpdate, AgentDefinition, SandboxSettings } from '@anthropic-ai/claude-agent-sdk'; import type { PermissionMode, McpServerConfig } from '../../core/models/index.js'; +import type { ProviderUsageSnapshot } from '../../core/models/response.js'; export type { SandboxSettings }; import type { PermissionResult } from '../../core/piece/index.js'; @@ -113,6 +114,8 @@ export interface ClaudeResult { fullContent?: string; /** Structured output returned by Claude SDK */ structuredOutput?: Record; + /** Provider-native usage payload normalized for TAKT observability */ + providerUsage?: ProviderUsageSnapshot; } /** Extended result with query ID for concurrent execution */ diff --git a/src/infra/codex/client.ts b/src/infra/codex/client.ts index cea67d0..3459c50 100644 --- a/src/infra/codex/client.ts +++ b/src/infra/codex/client.ts @@ -5,7 +5,8 @@ */ import { Codex, type TurnOptions } from '@openai/codex-sdk'; -import type { AgentResponse } from '../../core/models/index.js'; +import { USAGE_MISSING_REASONS } from '../../core/logging/contracts.js'; +import type { AgentResponse, ProviderUsageSnapshot } from '../../core/models/index.js'; import { createLogger, getErrorMessage, createStreamDiagnostics, parseStructuredOutput, type StreamDiagnostics } from '../../shared/utils/index.js'; import { mapToCodexSandboxMode, type CodexCallOptions } from './types.js'; import { @@ -38,6 +39,49 @@ const CODEX_RETRYABLE_ERROR_PATTERNS = [ 'fetch failed', ]; +function toNumber(value: unknown): number | undefined { + return typeof value === 'number' && Number.isFinite(value) ? value : undefined; +} + +function extractProviderUsageFromTurnCompleted(event: CodexEvent): ProviderUsageSnapshot { + const usageRaw = event.usage; + if (!usageRaw || typeof usageRaw !== 'object') { + return { + usageMissing: true, + reason: USAGE_MISSING_REASONS.NOT_AVAILABLE, + }; + } + + const usage = usageRaw as Record; + const inputTokens = toNumber(usage.input_tokens); + const outputTokens = toNumber(usage.output_tokens); + const explicitTotal = toNumber(usage.total_tokens); + const totalTokens = explicitTotal ?? ( + inputTokens !== undefined && outputTokens !== undefined + ? inputTokens + outputTokens + : undefined + ); + const cachedInputTokens = toNumber(usage.cached_input_tokens); + if (inputTokens === undefined || outputTokens === undefined || totalTokens === undefined) { + return { + usageMissing: true, + reason: USAGE_MISSING_REASONS.TOKENS_MISSING, + }; + } + + const providerUsage: ProviderUsageSnapshot = { + inputTokens, + outputTokens, + totalTokens, + usageMissing: false, + }; + if (cachedInputTokens !== undefined) { + providerUsage.cachedInputTokens = cachedInputTokens; + } + + return providerUsage; +} + /** * Client for Codex SDK agent interactions. * @@ -167,6 +211,7 @@ export class CodexClient { const contentOffsets = new Map(); let success = true; let failureMessage = ''; + let providerUsage: ProviderUsageSnapshot | undefined; const state = createStreamTrackingState(); for await (const event of events as AsyncGenerator) { @@ -180,6 +225,11 @@ export class CodexClient { continue; } + if (event.type === 'turn.completed') { + providerUsage = extractProviderUsageFromTurnCompleted(event); + continue; + } + if (event.type === 'turn.failed') { success = false; if (event.error && typeof event.error === 'object' && 'message' in event.error) { @@ -280,14 +330,19 @@ export class CodexClient { const structuredOutput = parseStructuredOutput(trimmed, !!options.outputSchema); emitResult(options.onStream, true, trimmed, currentThreadId); - return { + const response: AgentResponse = { persona: agentType, status: 'done', content: trimmed, timestamp: new Date(), sessionId: currentThreadId, structuredOutput, + providerUsage: providerUsage ?? { + usageMissing: true, + reason: USAGE_MISSING_REASONS.NOT_AVAILABLE, + }, }; + return response; } catch (error) { const message = getErrorMessage(error); const errorMessage = streamAbortController.signal.aborted diff --git a/src/infra/config/global/globalConfigCore.ts b/src/infra/config/global/globalConfigCore.ts index d795e79..dddc813 100644 --- a/src/infra/config/global/globalConfigCore.ts +++ b/src/infra/config/global/globalConfigCore.ts @@ -97,6 +97,7 @@ export class GlobalConfigManager { trace: parsed.logging.trace, debug: parsed.logging.debug, providerEvents: parsed.logging.provider_events, + usageEvents: parsed.logging.usage_events, } : undefined, analytics: parsed.analytics ? { enabled: parsed.analytics.enabled, diff --git a/src/infra/config/global/globalConfigSerializer.ts b/src/infra/config/global/globalConfigSerializer.ts index 218bf08..9a8e783 100644 --- a/src/infra/config/global/globalConfigSerializer.ts +++ b/src/infra/config/global/globalConfigSerializer.ts @@ -18,12 +18,14 @@ export function serializeGlobalConfig(config: PersistedGlobalConfig): Record; -} - -const MAX_TEXT_LENGTH = 10_000; -const HEAD_LENGTH = 5_000; -const TAIL_LENGTH = 2_000; -const TRUNCATED_MARKER = '...[truncated]'; - -function truncateString(value: string): string { - if (value.length <= MAX_TEXT_LENGTH) { - return value; - } - return value.slice(0, HEAD_LENGTH) + TRUNCATED_MARKER + value.slice(-TAIL_LENGTH); -} - -function sanitizeData(data: Record): Record { - return Object.fromEntries( - Object.entries(data).map(([key, value]) => { - if (typeof value === 'string') { - return [key, truncateString(value)]; - } - return [key, value]; - }) - ); -} - -function pickString(source: Record, keys: string[]): string | undefined { - for (const key of keys) { - const value = source[key]; - if (typeof value === 'string' && value.length > 0) { - return value; - } - } - return undefined; -} - -function buildLogRecord( - event: StreamEvent, - provider: ProviderType, - movement: string, - runId: string, -): ProviderEventLogRecord { - const data = sanitizeData(event.data as unknown as Record); - const sessionId = pickString(data, ['session_id', 'sessionId', 'sessionID', 'thread_id', 'threadId']); - const messageId = pickString(data, ['message_id', 'messageId', 'item_id', 'itemId']); - const callId = pickString(data, ['call_id', 'callId', 'id']); - const requestId = pickString(data, ['request_id', 'requestId']); - - return { - timestamp: new Date().toISOString(), - provider, - event_type: event.type, - run_id: runId, - movement, - ...(sessionId ? { session_id: sessionId } : {}), - ...(messageId ? { message_id: messageId } : {}), - ...(callId ? { call_id: callId } : {}), - ...(requestId ? { request_id: requestId } : {}), - data, - }; -} - -export function createProviderEventLogger(config: ProviderEventLoggerConfig): ProviderEventLogger { - const filepath = join(config.logsDir, `${config.sessionId}-provider-events.jsonl`); - let movement = config.movement; - let provider = config.provider; - let hasReportedWriteFailure = false; - - const write = (event: StreamEvent): void => { - try { - const record = buildLogRecord(event, provider, movement, config.runId); - appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8'); - } catch (error) { - if (hasReportedWriteFailure) { - return; - } - hasReportedWriteFailure = true; - const message = error instanceof Error ? error.message : String(error); - process.stderr.write(`[takt] Failed to write provider event log: ${message}\n`); - } - }; - - return { - filepath, - setMovement(nextMovement: string): void { - movement = nextMovement; - }, - setProvider(nextProvider: ProviderType): void { - provider = nextProvider; - }, - wrapCallback(original?: StreamCallback): StreamCallback { - if (!config.enabled && original) { - return original; - } - if (!config.enabled) { - return () => {}; - } - - return (event: StreamEvent): void => { - write(event); - original?.(event); - }; - }, - }; -} - -export function isProviderEventsEnabled(config?: { - logging?: { - providerEvents?: boolean; - }; -}): boolean { - return config?.logging?.providerEvents === true; -} +export { + createProviderEventLogger, + isProviderEventsEnabled, +} from '../../core/logging/providerEventLogger.js'; +export type { + ProviderEventLogger, + ProviderEventLoggerConfig, +} from '../../core/logging/providerEventLogger.js'; diff --git a/src/shared/utils/usageEventLogger.ts b/src/shared/utils/usageEventLogger.ts new file mode 100644 index 0000000..8a90bdc --- /dev/null +++ b/src/shared/utils/usageEventLogger.ts @@ -0,0 +1,8 @@ +export { + createUsageEventLogger, + isUsageEventsEnabled, +} from '../../core/logging/usageEventLogger.js'; +export type { + UsageEventLogger, + UsageEventLoggerConfig, +} from '../../core/logging/usageEventLogger.js';