takt: implement-usage-event-logging (#470)

This commit is contained in:
nrs 2026-03-05 11:27:59 +09:00 committed by GitHub
parent 2f268f6d43
commit 76cfd771f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1246 additions and 167 deletions

View File

@ -161,4 +161,49 @@ describe('QueryExecutor — structuredOutput 抽出', () => {
expect(result.content).toBe('final text');
expect(result.structuredOutput).toEqual({ step: 1, reason: 'approved' });
});
it('result メッセージの usage を providerUsage として抽出する', async () => {
mockQuery.mockReturnValue(createMockQuery([
{
type: 'result',
subtype: 'success',
result: 'done',
usage: {
input_tokens: 12,
output_tokens: 34,
cache_creation_input_tokens: 5,
cache_read_input_tokens: 7,
},
},
]));
const executor = new QueryExecutor();
const result = await executor.execute('test', { cwd: '/tmp' });
const providerUsage = result.providerUsage;
expect(providerUsage).toEqual({
inputTokens: 12,
outputTokens: 34,
totalTokens: 46,
cachedInputTokens: 12,
cacheCreationInputTokens: 5,
cacheReadInputTokens: 7,
usageMissing: false,
});
});
it('usage が存在しない場合は usageMissing=true と reason を返す', async () => {
mockQuery.mockReturnValue(createMockQuery([
{ type: 'result', subtype: 'success', result: 'done' },
]));
const executor = new QueryExecutor();
const result = await executor.execute('test', { cwd: '/tmp' });
const providerUsage = result.providerUsage;
expect(providerUsage).toMatchObject({
usageMissing: true,
reason: 'usage_not_available',
});
});
});

View File

@ -191,4 +191,42 @@ describe('CodexClient — structuredOutput 抽出', () => {
codexPathOverride: '/opt/codex/bin/codex',
});
});
it('turn.completed の usage を providerUsage として返す', async () => {
mockEvents = [
{ type: 'thread.started', thread_id: 'thread-1' },
{
type: 'turn.completed',
usage: { input_tokens: 11, output_tokens: 22, cached_input_tokens: 3 },
},
];
const client = new CodexClient();
const result = await client.call('coder', 'prompt', { cwd: '/tmp' });
const providerUsage = result.providerUsage;
expect(providerUsage).toEqual({
inputTokens: 11,
outputTokens: 22,
totalTokens: 33,
cachedInputTokens: 3,
usageMissing: false,
});
});
it('turn.completed に usage がない場合は usageMissing=true と reason を返す', async () => {
mockEvents = [
{ type: 'thread.started', thread_id: 'thread-1' },
{ type: 'turn.completed' },
];
const client = new CodexClient();
const result = await client.call('coder', 'prompt', { cwd: '/tmp' });
const providerUsage = result.providerUsage;
expect(providerUsage).toMatchObject({
usageMissing: true,
reason: 'usage_not_available',
});
});
});

View File

@ -503,6 +503,7 @@ describe('loadGlobalConfig', () => {
'language: en',
'logging:',
' provider_events: false',
' usage_events: true',
].join('\n'),
'utf-8',
);
@ -510,6 +511,7 @@ describe('loadGlobalConfig', () => {
const config = loadGlobalConfig();
expect(config.logging).toEqual({
providerEvents: false,
usageEvents: true,
});
});
@ -525,6 +527,7 @@ describe('loadGlobalConfig', () => {
' trace: true',
' debug: true',
' provider_events: true',
' usage_events: false',
].join('\n'),
'utf-8',
);
@ -535,6 +538,7 @@ describe('loadGlobalConfig', () => {
trace: true,
debug: true,
providerEvents: true,
usageEvents: false,
});
});
@ -549,6 +553,7 @@ describe('loadGlobalConfig', () => {
trace: false,
debug: true,
providerEvents: false,
usageEvents: true,
};
saveGlobalConfig(config);
invalidateGlobalConfigCache();
@ -559,6 +564,7 @@ describe('loadGlobalConfig', () => {
trace: false,
debug: true,
providerEvents: false,
usageEvents: true,
});
});
@ -580,6 +586,24 @@ describe('loadGlobalConfig', () => {
});
});
it('should save partial logging config (only usage_events)', () => {
const taktDir = join(testHomeDir, '.takt');
mkdirSync(taktDir, { recursive: true });
writeFileSync(getGlobalConfigPath(), 'language: en\n', 'utf-8');
const config = loadGlobalConfig();
config.logging = {
usageEvents: true,
};
saveGlobalConfig(config);
invalidateGlobalConfigCache();
const reloaded = loadGlobalConfig();
expect(reloaded.logging).toEqual({
usageEvents: true,
});
});
it('should save and reload notification_sound_events config', () => {
const taktDir = join(testHomeDir, '.takt');

View File

@ -0,0 +1,118 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { mkdirSync, rmSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import {
PROVIDER_EVENTS_LOG_FILE_SUFFIX,
USAGE_EVENTS_LOG_FILE_SUFFIX,
USAGE_MISSING_REASONS,
} from '../core/logging/contracts.js';
import { buildUsageEventRecord } from '../core/logging/providerEvent.js';
import { createProviderEventLogger } from '../core/logging/providerEventLogger.js';
import { createUsageEventLogger } from '../core/logging/usageEventLogger.js';
import type { ProviderUsageSnapshot } from '../core/models/response.js';
describe('logging contracts', () => {
let tempDir: string;
beforeEach(() => {
tempDir = join(tmpdir(), `takt-logging-contracts-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`);
mkdirSync(tempDir, { recursive: true });
});
afterEach(() => {
rmSync(tempDir, { recursive: true, force: true });
});
it('should use shared file suffix contracts for provider and usage loggers', () => {
const providerLogger = createProviderEventLogger({
logsDir: tempDir,
sessionId: 'session-a',
runId: 'run-a',
provider: 'claude',
movement: 'plan',
enabled: false,
});
const usageLogger = createUsageEventLogger({
logsDir: tempDir,
sessionId: 'session-b',
runId: 'run-b',
provider: 'codex',
providerModel: 'gpt-5-codex',
movement: 'implement',
movementType: 'normal',
enabled: false,
});
expect(providerLogger.filepath.endsWith(PROVIDER_EVENTS_LOG_FILE_SUFFIX)).toBe(true);
expect(usageLogger.filepath.endsWith(USAGE_EVENTS_LOG_FILE_SUFFIX)).toBe(true);
});
it('should accept shared ProviderUsageSnapshot contract in usage record builder', () => {
const usage: ProviderUsageSnapshot = {
inputTokens: 12,
outputTokens: 8,
totalTokens: 20,
cachedInputTokens: 4,
cacheCreationInputTokens: 2,
cacheReadInputTokens: 2,
usageMissing: false,
};
const record = buildUsageEventRecord(
{
runId: 'run-1',
sessionId: 'session-1',
provider: 'claude',
providerModel: 'sonnet',
movement: 'implement',
movementType: 'normal',
},
{ success: true, usage, timestamp: new Date('2026-03-04T12:00:00.000Z') },
);
expect(record.usage.cached_input_tokens).toBe(4);
});
it('should reject usage_missing records with unknown reason values', () => {
expect(() =>
buildUsageEventRecord(
{
runId: 'run-2',
sessionId: 'session-2',
provider: 'opencode',
providerModel: 'openai/gpt-4.1',
movement: 'implement',
movementType: 'normal',
},
{
success: true,
usage: {
usageMissing: true,
reason: 'invalid_reason',
} as ProviderUsageSnapshot,
},
),
).toThrow('[usage-events] reason is invalid');
expect(() =>
buildUsageEventRecord(
{
runId: 'run-3',
sessionId: 'session-3',
provider: 'opencode',
providerModel: 'openai/gpt-4.1',
movement: 'implement',
movementType: 'normal',
},
{
success: true,
usage: {
usageMissing: true,
reason: USAGE_MISSING_REASONS.NOT_AVAILABLE,
},
},
),
).not.toThrow();
});
});

View File

@ -614,11 +614,13 @@ describe('GlobalConfigSchema', () => {
const config = {
logging: {
provider_events: false,
usage_events: true,
},
};
const result = GlobalConfigSchema.parse(config);
expect(result.logging?.provider_events).toBe(false);
expect(result.logging?.usage_events).toBe(true);
});
it('should accept full logging config with all fields', () => {
@ -628,6 +630,7 @@ describe('GlobalConfigSchema', () => {
trace: true,
debug: true,
provider_events: true,
usage_events: false,
},
};
@ -636,6 +639,7 @@ describe('GlobalConfigSchema', () => {
expect(result.logging?.trace).toBe(true);
expect(result.logging?.debug).toBe(true);
expect(result.logging?.provider_events).toBe(true);
expect(result.logging?.usage_events).toBe(false);
});
it('should accept partial logging config', () => {
@ -650,6 +654,7 @@ describe('GlobalConfigSchema', () => {
expect(result.logging?.trace).toBeUndefined();
expect(result.logging?.debug).toBeUndefined();
expect(result.logging?.provider_events).toBeUndefined();
expect(result.logging?.usage_events).toBeUndefined();
});
it('should reject invalid logging level', () => {

View File

@ -6,14 +6,45 @@
*/
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { USAGE_MISSING_REASONS } from '../core/logging/contracts.js';
import type { PieceConfig } from '../core/models/index.js';
const { MockPieceEngine, mockLoadPersonaSessions, mockLoadWorktreeSessions } = vi.hoisted(() => {
const {
MockPieceEngine,
mockLoadPersonaSessions,
mockLoadWorktreeSessions,
mockCreateUsageEventLogger,
mockUsageLogger,
mockMovementResponse,
} = vi.hoisted(() => {
// eslint-disable-next-line @typescript-eslint/no-require-imports
const { EventEmitter: EE } = require('node:events') as typeof import('node:events');
const mockLoadPersonaSessions = vi.fn().mockReturnValue({ coder: 'saved-session-id' });
const mockLoadWorktreeSessions = vi.fn().mockReturnValue({ coder: 'worktree-session-id' });
const mockUsageLogger = {
filepath: '/tmp/test-usage-events.jsonl',
setMovement: vi.fn(),
setProvider: vi.fn(),
logUsage: vi.fn(),
};
const mockCreateUsageEventLogger = vi.fn().mockReturnValue(mockUsageLogger);
const mockMovementResponse: {
providerUsage: {
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
usageMissing: boolean;
reason?: string;
} | undefined;
} = {
providerUsage: {
inputTokens: 3,
outputTokens: 2,
totalTokens: 5,
usageMissing: false,
},
};
type PersonaProviderMap = Record<string, { provider?: string; model?: string }>;
@ -47,13 +78,28 @@ const { MockPieceEngine, mockLoadPersonaSessions, mockLoadWorktreeSessions } = v
if (firstStep) {
const providerInfo = resolveProviderInfo(firstStep, this.receivedOptions);
this.emit('movement:start', firstStep, 1, firstStep.instructionTemplate, providerInfo);
this.emit('movement:complete', firstStep, {
persona: firstStep.personaDisplayName,
status: 'done',
content: 'ok',
timestamp: new Date('2026-03-04T00:00:00.000Z'),
sessionId: 'movement-session',
providerUsage: mockMovementResponse.providerUsage,
}, firstStep.instructionTemplate);
}
this.emit('piece:complete', { status: 'completed', iteration: 1 });
return { status: 'completed', iteration: 1 };
}
}
return { MockPieceEngine, mockLoadPersonaSessions, mockLoadWorktreeSessions };
return {
MockPieceEngine,
mockLoadPersonaSessions,
mockLoadWorktreeSessions,
mockCreateUsageEventLogger,
mockUsageLogger,
mockMovementResponse,
};
});
vi.mock('../core/piece/index.js', async () => {
@ -146,6 +192,10 @@ vi.mock('../shared/prompt/index.js', () => ({
selectOption: vi.fn(),
promptInput: vi.fn(),
}));
vi.mock('../shared/utils/usageEventLogger.js', () => ({
createUsageEventLogger: mockCreateUsageEventLogger,
isUsageEventsEnabled: vi.fn().mockReturnValue(true),
}));
vi.mock('../shared/i18n/index.js', () => ({
getLabel: vi.fn().mockImplementation((key: string) => key),
@ -188,12 +238,30 @@ function makeConfig(): PieceConfig {
};
}
function makeConfigWithMovement(overrides: Record<string, unknown>): PieceConfig {
const baseMovement = makeConfig().movements[0];
if (!baseMovement) {
throw new Error('Base movement is required');
}
return {
...makeConfig(),
movements: [{ ...baseMovement, ...overrides }],
};
}
describe('executePiece session loading', () => {
beforeEach(() => {
vi.clearAllMocks();
mockCreateUsageEventLogger.mockReturnValue(mockUsageLogger);
vi.mocked(resolvePieceConfigValues).mockReturnValue({ ...defaultResolvedConfigValues });
mockLoadPersonaSessions.mockReturnValue({ coder: 'saved-session-id' });
mockLoadWorktreeSessions.mockReturnValue({ coder: 'worktree-session-id' });
mockMovementResponse.providerUsage = {
inputTokens: 3,
outputTokens: 2,
totalTokens: 5,
usageMissing: false,
};
});
it('should pass empty initialSessions on normal run', async () => {
@ -208,6 +276,41 @@ describe('executePiece session loading', () => {
expect(MockPieceEngine.lastInstance.receivedOptions.initialSessions).toEqual({});
});
it('should log usage events on movement completion when usage logging is enabled', async () => {
await executePiece(makeConfig(), 'task', '/tmp/project', {
projectCwd: '/tmp/project',
});
expect(mockCreateUsageEventLogger).toHaveBeenCalledOnce();
expect(mockUsageLogger.setMovement).toHaveBeenCalledWith('implement', 'normal');
expect(mockUsageLogger.setProvider).toHaveBeenCalledWith('claude', '(default)');
expect(mockUsageLogger.logUsage).toHaveBeenCalledWith({
success: true,
usage: {
inputTokens: 3,
outputTokens: 2,
totalTokens: 5,
usageMissing: false,
},
});
});
it('should log usage_missing reason when provider usage is unavailable', async () => {
mockMovementResponse.providerUsage = undefined;
await executePiece(makeConfig(), 'task', '/tmp/project', {
projectCwd: '/tmp/project',
});
expect(mockUsageLogger.logUsage).toHaveBeenCalledWith({
success: true,
usage: {
usageMissing: true,
reason: USAGE_MISSING_REASONS.NOT_AVAILABLE,
},
});
});
it('should load persisted sessions when startMovement is set (retry)', async () => {
// Given: retry execution with startMovement
await executePiece(makeConfig(), 'task', '/tmp/project', {
@ -314,4 +417,33 @@ describe('executePiece session loading', () => {
expect(mockInfo).toHaveBeenCalledWith('Provider: opencode');
expect(mockInfo).toHaveBeenCalledWith('Model: gpt-5');
});
it('should pass movement type to usage logger for parallel movement', async () => {
await executePiece(makeConfigWithMovement({ parallel: { branches: [] } }), 'task', '/tmp/project', {
projectCwd: '/tmp/project',
});
expect(mockUsageLogger.setMovement).toHaveBeenCalledWith('implement', 'parallel');
});
it('should pass movement type to usage logger for arpeggio movement', async () => {
await executePiece(makeConfigWithMovement({ arpeggio: { source: './items.csv' } }), 'task', '/tmp/project', {
projectCwd: '/tmp/project',
});
expect(mockUsageLogger.setMovement).toHaveBeenCalledWith('implement', 'arpeggio');
});
it('should pass movement type to usage logger for team leader movement', async () => {
await executePiece(
makeConfigWithMovement({ teamLeader: { output: { mode: 'summary' } } }),
'task',
'/tmp/project',
{
projectCwd: '/tmp/project',
},
);
expect(mockUsageLogger.setMovement).toHaveBeenCalledWith('implement', 'team_leader');
});
});

View File

@ -94,6 +94,20 @@ describe('providerEventLogger', () => {
expect(parsed.data['tool']).toBe('Read');
});
it('should keep provider-events filename suffix for backward compatibility', () => {
const logger = createProviderEventLogger({
logsDir: tempDir,
sessionId: 'session-compat',
runId: 'run-compat',
provider: 'claude',
movement: 'plan',
enabled: true,
});
expect(logger.filepath.endsWith('-provider-events.jsonl')).toBe(true);
expect(logger.filepath.endsWith('-usage-events.jsonl')).toBe(false);
});
it('should update movement and provider for subsequent events', () => {
const logger = createProviderEventLogger({
logsDir: tempDir,

View File

@ -318,6 +318,26 @@ describe('loadRunSessionContext', () => {
expect(context.movementLogs).toEqual([]);
});
it('should exclude usage-events log files', () => {
const slug = 'usage-events-run';
const runDir = createRunDir(tmpDir, slug, {
task: 'Usage events test',
piece: 'default',
status: 'completed',
startTime: '2026-02-01T00:00:00.000Z',
logsDirectory: `.takt/runs/${slug}/logs`,
reportDirectory: `.takt/runs/${slug}/reports`,
runSlug: slug,
});
// Only usage-events log file
writeFileSync(join(runDir, 'logs', 'session-001-usage-events.jsonl'), '{}', 'utf-8');
const context = loadRunSessionContext(tmpDir, slug);
expect(mockLoadNdjsonLog).not.toHaveBeenCalled();
expect(context.movementLogs).toEqual([]);
});
afterEach(() => {
rmSync(tmpDir, { recursive: true, force: true });
});

View File

@ -0,0 +1,273 @@
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { existsSync, mkdirSync, readFileSync, rmSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
type ProviderType = 'claude' | 'codex' | 'opencode';
type MovementType = 'normal' | 'parallel' | 'arpeggio' | 'team_leader';
interface ProviderUsageSnapshot {
readonly inputTokens?: number;
readonly outputTokens?: number;
readonly totalTokens?: number;
readonly cachedInputTokens?: number;
readonly usageMissing: boolean;
readonly reason?: string;
}
interface UsageEventLoggerConfig {
readonly logsDir: string;
readonly sessionId: string;
readonly runId: string;
readonly provider: ProviderType;
readonly providerModel: string;
readonly movement: string;
readonly movementType: MovementType;
readonly enabled: boolean;
}
interface UsageEventLogger {
readonly filepath: string;
setMovement(movement: string, movementType: MovementType): void;
setProvider(provider: ProviderType, providerModel: string): void;
logUsage(params: {
readonly success: boolean;
readonly usage: ProviderUsageSnapshot;
readonly timestamp?: Date;
}): void;
}
interface UsageEventLoggerModule {
createUsageEventLogger(config: UsageEventLoggerConfig): UsageEventLogger;
isUsageEventsEnabled(config?: { logging?: { usageEvents?: boolean } }): boolean;
}
const USAGE_EVENT_LOGGER_MODULE_PATH = ['..', 'shared', 'utils', 'usageEventLogger.js'].join('/');
async function loadUsageEventLoggerModule(): Promise<UsageEventLoggerModule> {
return (await import(USAGE_EVENT_LOGGER_MODULE_PATH)) as UsageEventLoggerModule;
}
describe('usageEventLogger', () => {
let tempDir: string;
beforeEach(() => {
tempDir = join(tmpdir(), `takt-usage-events-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`);
mkdirSync(tempDir, { recursive: true });
});
afterEach(() => {
rmSync(tempDir, { recursive: true, force: true });
});
it('should disable usage events by default', async () => {
const { isUsageEventsEnabled } = await loadUsageEventLoggerModule();
expect(isUsageEventsEnabled()).toBe(false);
expect(isUsageEventsEnabled({})).toBe(false);
expect(isUsageEventsEnabled({ logging: {} })).toBe(false);
});
it('should enable usage events only when explicitly true', async () => {
const { isUsageEventsEnabled } = await loadUsageEventLoggerModule();
expect(isUsageEventsEnabled({ logging: { usageEvents: true } })).toBe(true);
expect(isUsageEventsEnabled({ logging: { usageEvents: false } })).toBe(false);
});
it('should write usage event records with required fields', async () => {
const { createUsageEventLogger } = await loadUsageEventLoggerModule();
const logger = createUsageEventLogger({
logsDir: tempDir,
sessionId: 'session-1',
runId: 'run-1',
provider: 'codex',
providerModel: 'gpt-5-codex',
movement: 'implement',
movementType: 'normal',
enabled: true,
});
logger.logUsage({
success: true,
usage: {
inputTokens: 12,
outputTokens: 8,
totalTokens: 20,
cachedInputTokens: 4,
usageMissing: false,
},
timestamp: new Date('2026-03-04T12:00:00.000Z'),
});
expect(existsSync(logger.filepath)).toBe(true);
const line = readFileSync(logger.filepath, 'utf-8').trim();
const parsed = JSON.parse(line) as {
run_id: string;
session_id: string;
provider: ProviderType;
provider_model: string;
movement: string;
movement_type: MovementType;
timestamp: string;
success: boolean;
usage_missing: boolean;
reason?: string;
usage: {
input_tokens?: number;
output_tokens?: number;
total_tokens?: number;
cached_input_tokens?: number;
};
};
expect(parsed.run_id).toBe('run-1');
expect(parsed.session_id).toBe('session-1');
expect(parsed.provider).toBe('codex');
expect(parsed.provider_model).toBe('gpt-5-codex');
expect(parsed.movement).toBe('implement');
expect(parsed.movement_type).toBe('normal');
expect(parsed.success).toBe(true);
expect(parsed.usage_missing).toBe(false);
expect(parsed.timestamp).toBe('2026-03-04T12:00:00.000Z');
expect(parsed.usage.input_tokens).toBe(12);
expect(parsed.usage.output_tokens).toBe(8);
expect(parsed.usage.total_tokens).toBe(20);
expect(parsed.usage.cached_input_tokens).toBe(4);
});
it('should write usage_missing and reason when provider usage is unavailable', async () => {
const { createUsageEventLogger } = await loadUsageEventLoggerModule();
const logger = createUsageEventLogger({
logsDir: tempDir,
sessionId: 'session-2',
runId: 'run-2',
provider: 'opencode',
providerModel: 'openai/gpt-4.1',
movement: 'implement',
movementType: 'normal',
enabled: true,
});
logger.logUsage({
success: true,
usage: {
usageMissing: true,
reason: 'usage_not_supported_by_provider',
},
});
const line = readFileSync(logger.filepath, 'utf-8').trim();
const parsed = JSON.parse(line) as {
provider: ProviderType;
usage_missing: boolean;
reason?: string;
usage: {
input_tokens?: number;
output_tokens?: number;
total_tokens?: number;
cached_input_tokens?: number;
};
};
expect(parsed.provider).toBe('opencode');
expect(parsed.usage_missing).toBe(true);
expect(parsed.reason).toBe('usage_not_supported_by_provider');
expect(parsed.usage).toEqual({});
});
it('should update movement and provider metadata for subsequent records', async () => {
const { createUsageEventLogger } = await loadUsageEventLoggerModule();
const logger = createUsageEventLogger({
logsDir: tempDir,
sessionId: 'session-3',
runId: 'run-3',
provider: 'claude',
providerModel: 'sonnet',
movement: 'plan',
movementType: 'normal',
enabled: true,
});
logger.logUsage({
success: true,
usage: { inputTokens: 1, outputTokens: 2, totalTokens: 3, usageMissing: false },
});
logger.setMovement('implement', 'parallel');
logger.setProvider('codex', 'gpt-5-codex');
logger.logUsage({
success: true,
usage: { inputTokens: 4, outputTokens: 5, totalTokens: 9, usageMissing: false },
});
const lines = readFileSync(logger.filepath, 'utf-8').trim().split('\n');
expect(lines).toHaveLength(2);
const first = JSON.parse(lines[0] ?? '{}') as { provider: ProviderType; provider_model: string; movement: string; movement_type: MovementType };
const second = JSON.parse(lines[1] ?? '{}') as { provider: ProviderType; provider_model: string; movement: string; movement_type: MovementType };
expect(first.provider).toBe('claude');
expect(first.provider_model).toBe('sonnet');
expect(first.movement).toBe('plan');
expect(first.movement_type).toBe('normal');
expect(second.provider).toBe('codex');
expect(second.provider_model).toBe('gpt-5-codex');
expect(second.movement).toBe('implement');
expect(second.movement_type).toBe('parallel');
});
it('should not write records when disabled', async () => {
const { createUsageEventLogger } = await loadUsageEventLoggerModule();
const logger = createUsageEventLogger({
logsDir: tempDir,
sessionId: 'session-disabled',
runId: 'run-disabled',
provider: 'claude',
providerModel: 'sonnet',
movement: 'plan',
movementType: 'normal',
enabled: false,
});
logger.logUsage({
success: true,
usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2, usageMissing: false },
});
expect(existsSync(logger.filepath)).toBe(false);
});
it('should report file write failures to stderr only once', async () => {
const { createUsageEventLogger } = await loadUsageEventLoggerModule();
const logger = createUsageEventLogger({
logsDir: join(tempDir, 'missing', 'nested'),
sessionId: 'session-err',
runId: 'run-err',
provider: 'claude',
providerModel: 'sonnet',
movement: 'plan',
movementType: 'normal',
enabled: true,
});
const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation(() => true);
try {
logger.logUsage({
success: true,
usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2, usageMissing: false },
});
logger.logUsage({
success: true,
usage: { inputTokens: 2, outputTokens: 2, totalTokens: 4, usageMissing: false },
});
expect(stderrSpy).toHaveBeenCalledTimes(1);
expect(stderrSpy.mock.calls[0]?.[0]).toContain('Failed to write usage event log');
} finally {
stderrSpy.mockRestore();
}
});
});

View File

@ -0,0 +1,11 @@
export const PROVIDER_EVENTS_LOG_FILE_SUFFIX = '-provider-events.jsonl';
export const USAGE_EVENTS_LOG_FILE_SUFFIX = '-usage-events.jsonl';
export const USAGE_MISSING_REASONS = {
NOT_AVAILABLE: 'usage_not_available',
TOKENS_MISSING: 'usage_tokens_missing',
NOT_SUPPORTED_BY_PROVIDER: 'usage_not_supported_by_provider',
} as const;
export type UsageMissingReason =
(typeof USAGE_MISSING_REASONS)[keyof typeof USAGE_MISSING_REASONS];

View File

@ -0,0 +1,176 @@
import type { ProviderType, StreamEvent } from '../../shared/types/provider.js';
import type { ProviderUsageSnapshot } from '../models/response.js';
import { USAGE_MISSING_REASONS, type UsageMissingReason } from './contracts.js';
export type MovementType = 'normal' | 'parallel' | 'arpeggio' | 'team_leader';
export interface ProviderEventLogRecord {
timestamp: string;
provider: ProviderType;
event_type: string;
run_id: string;
movement: string;
session_id?: string;
message_id?: string;
call_id?: string;
request_id?: string;
data: Record<string, unknown>;
}
export interface UsageEventLogRecord {
run_id: string;
session_id: string;
provider: ProviderType;
provider_model: string;
movement: string;
movement_type: MovementType;
timestamp: string;
success: boolean;
usage_missing: boolean;
reason?: UsageMissingReason;
usage: {
input_tokens?: number;
output_tokens?: number;
total_tokens?: number;
cached_input_tokens?: number;
};
}
interface UsageEventMeta {
runId: string;
sessionId: string;
provider: ProviderType;
providerModel: string;
movement: string;
movementType: MovementType;
}
interface BuildUsageRecordParams {
success: boolean;
usage: ProviderUsageSnapshot;
timestamp?: Date;
}
const MAX_TEXT_LENGTH = 10_000;
const HEAD_LENGTH = 5_000;
const TAIL_LENGTH = 2_000;
const TRUNCATED_MARKER = '...[truncated]';
function truncateString(value: string): string {
if (value.length <= MAX_TEXT_LENGTH) {
return value;
}
return value.slice(0, HEAD_LENGTH) + TRUNCATED_MARKER + value.slice(-TAIL_LENGTH);
}
function sanitizeData(data: Record<string, unknown>): Record<string, unknown> {
return Object.fromEntries(
Object.entries(data).map(([key, value]) => {
if (typeof value === 'string') {
return [key, truncateString(value)];
}
return [key, value];
})
);
}
function pickString(source: Record<string, unknown>, keys: string[]): string | undefined {
for (const key of keys) {
const value = source[key];
if (typeof value === 'string' && value.length > 0) {
return value;
}
}
return undefined;
}
function assertFiniteNumber(value: number | undefined, field: string): void {
if (!Number.isFinite(value)) {
throw new Error(`[usage-events] ${field} is required`);
}
}
function assertUsageMissingReason(value: string): UsageMissingReason {
for (const reason of Object.values(USAGE_MISSING_REASONS)) {
if (value === reason) {
return value;
}
}
throw new Error('[usage-events] reason is invalid');
}
export function normalizeProviderEvent(
event: StreamEvent,
provider: ProviderType,
movement: string,
runId: string
): ProviderEventLogRecord {
const data = sanitizeData(event.data as unknown as Record<string, unknown>);
const sessionId = pickString(data, ['session_id', 'sessionId', 'sessionID', 'thread_id', 'threadId']);
const messageId = pickString(data, ['message_id', 'messageId', 'item_id', 'itemId']);
const callId = pickString(data, ['call_id', 'callId', 'id']);
const requestId = pickString(data, ['request_id', 'requestId']);
return {
timestamp: new Date().toISOString(),
provider,
event_type: event.type,
run_id: runId,
movement,
...(sessionId ? { session_id: sessionId } : {}),
...(messageId ? { message_id: messageId } : {}),
...(callId ? { call_id: callId } : {}),
...(requestId ? { request_id: requestId } : {}),
data,
};
}
export function buildUsageEventRecord(
meta: UsageEventMeta,
params: BuildUsageRecordParams
): UsageEventLogRecord {
if (params.usage.usageMissing) {
if (typeof params.usage.reason !== 'string' || params.usage.reason.length === 0) {
throw new Error('[usage-events] reason is required when usageMissing=true');
}
return {
run_id: meta.runId,
session_id: meta.sessionId,
provider: meta.provider,
provider_model: meta.providerModel,
movement: meta.movement,
movement_type: meta.movementType,
timestamp: (params.timestamp ?? new Date()).toISOString(),
success: params.success,
usage_missing: true,
reason: assertUsageMissingReason(params.usage.reason),
usage: {},
};
}
assertFiniteNumber(params.usage.inputTokens, 'usage.inputTokens');
assertFiniteNumber(params.usage.outputTokens, 'usage.outputTokens');
assertFiniteNumber(params.usage.totalTokens, 'usage.totalTokens');
const usage = {
input_tokens: params.usage.inputTokens,
output_tokens: params.usage.outputTokens,
total_tokens: params.usage.totalTokens,
...(Number.isFinite(params.usage.cachedInputTokens)
? { cached_input_tokens: params.usage.cachedInputTokens }
: {}),
};
return {
run_id: meta.runId,
session_id: meta.sessionId,
provider: meta.provider,
provider_model: meta.providerModel,
movement: meta.movement,
movement_type: meta.movementType,
timestamp: (params.timestamp ?? new Date()).toISOString(),
success: params.success,
usage_missing: false,
usage,
};
}

View File

@ -0,0 +1,87 @@
import { appendFileSync } from 'node:fs';
import { join } from 'node:path';
import type { ProviderType, StreamCallback, StreamEvent } from '../../shared/types/provider.js';
import { PROVIDER_EVENTS_LOG_FILE_SUFFIX } from './contracts.js';
import { normalizeProviderEvent } from './providerEvent.js';
export interface ProviderEventLoggerConfig {
logsDir: string;
sessionId: string;
runId: string;
provider: ProviderType;
movement: string;
enabled: boolean;
}
export interface ProviderEventLogger {
readonly filepath: string;
setMovement(movement: string): void;
setProvider(provider: ProviderType): void;
wrapCallback(original?: StreamCallback): StreamCallback;
}
function assertNonEmpty(value: string, field: string): void {
if (value.length === 0) {
throw new Error(`[provider-events] ${field} is required`);
}
}
export function createProviderEventLogger(config: ProviderEventLoggerConfig): ProviderEventLogger {
if (config.enabled) {
assertNonEmpty(config.logsDir, 'logsDir');
assertNonEmpty(config.sessionId, 'sessionId');
assertNonEmpty(config.runId, 'runId');
assertNonEmpty(config.movement, 'movement');
}
const filepath = join(config.logsDir, `${config.sessionId}${PROVIDER_EVENTS_LOG_FILE_SUFFIX}`);
let movement = config.movement;
let provider = config.provider;
let hasReportedWriteFailure = false;
const write = (event: StreamEvent): void => {
const record = normalizeProviderEvent(event, provider, movement, config.runId);
try {
appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8');
} catch (error) {
if (hasReportedWriteFailure) {
return;
}
hasReportedWriteFailure = true;
const message = error instanceof Error ? error.message : String(error);
process.stderr.write(`[takt] Failed to write provider event log: ${message}\n`);
}
};
return {
filepath,
setMovement(nextMovement: string): void {
assertNonEmpty(nextMovement, 'movement');
movement = nextMovement;
},
setProvider(nextProvider: ProviderType): void {
provider = nextProvider;
},
wrapCallback(original?: StreamCallback): StreamCallback {
if (!config.enabled && original) {
return original;
}
if (!config.enabled) {
return () => {};
}
return (event: StreamEvent): void => {
write(event);
original?.(event);
};
},
};
}
export function isProviderEventsEnabled(config?: {
logging?: {
providerEvents?: boolean;
};
}): boolean {
return config?.logging?.providerEvents === true;
}

View File

@ -0,0 +1,108 @@
import { appendFileSync } from 'node:fs';
import { join } from 'node:path';
import type { ProviderType } from '../../shared/types/provider.js';
import type { ProviderUsageSnapshot } from '../models/response.js';
import { USAGE_EVENTS_LOG_FILE_SUFFIX } from './contracts.js';
import {
buildUsageEventRecord,
type MovementType,
} from './providerEvent.js';
export interface UsageEventLoggerConfig {
readonly logsDir: string;
readonly sessionId: string;
readonly runId: string;
readonly provider: ProviderType;
readonly providerModel: string;
readonly movement: string;
readonly movementType: MovementType;
readonly enabled: boolean;
}
export interface UsageEventLogger {
readonly filepath: string;
setMovement(movement: string, movementType: MovementType): void;
setProvider(provider: ProviderType, providerModel: string): void;
logUsage(params: {
readonly success: boolean;
readonly usage: ProviderUsageSnapshot;
readonly timestamp?: Date;
}): void;
}
function assertNonEmpty(value: string, field: string): void {
if (value.length === 0) {
throw new Error(`[usage-events] ${field} is required`);
}
}
export function createUsageEventLogger(config: UsageEventLoggerConfig): UsageEventLogger {
if (config.enabled) {
assertNonEmpty(config.logsDir, 'logsDir');
assertNonEmpty(config.sessionId, 'sessionId');
assertNonEmpty(config.runId, 'runId');
assertNonEmpty(config.providerModel, 'providerModel');
assertNonEmpty(config.movement, 'movement');
}
const filepath = join(config.logsDir, `${config.sessionId}${USAGE_EVENTS_LOG_FILE_SUFFIX}`);
let movement = config.movement;
let movementType = config.movementType;
let provider = config.provider;
let providerModel = config.providerModel;
let hasReportedWriteFailure = false;
return {
filepath,
setMovement(nextMovement: string, nextMovementType: MovementType): void {
assertNonEmpty(nextMovement, 'movement');
movement = nextMovement;
movementType = nextMovementType;
},
setProvider(nextProvider: ProviderType, nextProviderModel: string): void {
assertNonEmpty(nextProviderModel, 'providerModel');
provider = nextProvider;
providerModel = nextProviderModel;
},
logUsage(params: {
readonly success: boolean;
readonly usage: ProviderUsageSnapshot;
readonly timestamp?: Date;
}): void {
if (!config.enabled) {
return;
}
const record = buildUsageEventRecord(
{
runId: config.runId,
sessionId: config.sessionId,
provider,
providerModel,
movement,
movementType,
},
params
);
try {
appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8');
} catch (error) {
if (hasReportedWriteFailure) {
return;
}
hasReportedWriteFailure = true;
const message = error instanceof Error ? error.message : String(error);
process.stderr.write(`[takt] Failed to write usage event log: ${message}\n`);
}
},
};
}
export function isUsageEventsEnabled(config?: {
logging?: {
usageEvents?: boolean;
};
}): boolean {
return config?.logging?.usageEvents === true;
}

View File

@ -11,6 +11,7 @@ export type {
RuntimePrepareEntry,
PieceRuntimeConfig,
AgentResponse,
ProviderUsageSnapshot,
SessionState,
PartDefinition,
PartResult,

View File

@ -45,6 +45,8 @@ export interface LoggingConfig {
debug?: boolean;
/** Enable provider stream event logging (default: false when undefined) */
providerEvents?: boolean;
/** Enable usage event logging (default: false when undefined) */
usageEvents?: boolean;
}
/** Analytics configuration for local metrics collection */

View File

@ -4,6 +4,17 @@
import type { Status, RuleMatchMethod } from './status.js';
export interface ProviderUsageSnapshot {
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
cachedInputTokens?: number;
cacheCreationInputTokens?: number;
cacheReadInputTokens?: number;
usageMissing: boolean;
reason?: string;
}
/** Response from an agent execution */
export interface AgentResponse {
persona: string;
@ -19,4 +30,6 @@ export interface AgentResponse {
matchedRuleMethod?: RuleMatchMethod;
/** Structured output returned by provider SDK (JSON Schema mode) */
structuredOutput?: Record<string, unknown>;
/** Provider-native usage payload normalized for TAKT observability */
providerUsage?: ProviderUsageSnapshot;
}

View File

@ -466,6 +466,7 @@ export const LoggingConfigSchema = z.object({
trace: z.boolean().optional(),
debug: z.boolean().optional(),
provider_events: z.boolean().optional(),
usage_events: z.boolean().optional(),
});
/** Analytics config schema */

View File

@ -16,6 +16,7 @@ export type {
// Agent response
export type {
AgentResponse,
ProviderUsageSnapshot,
} from './response.js';
// Session state (authoritative definition with createSessionState)

View File

@ -7,6 +7,10 @@
import { existsSync, readdirSync, readFileSync } from 'node:fs';
import { join } from 'node:path';
import {
PROVIDER_EVENTS_LOG_FILE_SUFFIX,
USAGE_EVENTS_LOG_FILE_SUFFIX,
} from '../../core/logging/contracts.js';
import { loadNdjsonLog } from '../../infra/fs/index.js';
import type { SessionLog } from '../../shared/utils/index.js';
@ -113,7 +117,11 @@ function findSessionLogFile(logsDir: string): string | null {
}
const files = readdirSync(logsDir).filter(
(f) => f.endsWith('.jsonl') && !f.includes('-provider-events'),
(f) => (
f.endsWith('.jsonl')
&& !f.endsWith(PROVIDER_EVENTS_LOG_FILE_SUFFIX)
&& !f.endsWith(USAGE_EVENTS_LOG_FILE_SUFFIX)
),
);
const first = files[0];

View File

@ -13,6 +13,8 @@ import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js';
import { generateSessionId, createSessionLog, finalizeSessionLog, initNdjsonLog } from '../../../infra/fs/index.js';
import { createLogger, notifySuccess, notifyError, preventSleep, generateReportDir, isValidReportDirName, getDebugPromptsLogFile } from '../../../shared/utils/index.js';
import { createProviderEventLogger, isProviderEventsEnabled } from '../../../shared/utils/providerEventLogger.js';
import { createUsageEventLogger, isUsageEventsEnabled } from '../../../shared/utils/usageEventLogger.js';
import { USAGE_MISSING_REASONS } from '../../../core/logging/contracts.js';
import { getLabel } from '../../../shared/i18n/index.js';
import { buildRunPaths } from '../../../core/piece/run/run-paths.js';
import { resolveRuntimeConfig } from '../../../core/runtime/runtime-environment.js';
@ -24,7 +26,7 @@ import { AnalyticsEmitter } from './analyticsEmitter.js';
import { createOutputFns, createPrefixedStreamHandler } from './outputFns.js';
import { RunMetaManager } from './runMeta.js';
import { createIterationLimitHandler, createUserInputHandler } from './iterationLimitHandler.js';
import { assertTaskPrefixPair, truncate, formatElapsedTime } from './pieceExecutionUtils.js';
import { assertTaskPrefixPair, truncate, formatElapsedTime, detectMovementType } from './pieceExecutionUtils.js';
import { createTraceReportWriter } from './traceReportWriter.js';
import { sanitizeTextForStorage } from './traceReportRedaction.js';
export type { PieceExecutionResult, PieceExecutionOptions };
@ -46,7 +48,6 @@ export async function executePiece(
const isRetry = Boolean(options.startMovement || options.retryNote);
log.debug('Session mode', { isRetry, isWorktree: cwd !== projectCwd });
out.header(`${headerPrefix} ${pieceConfig.name}`);
const pieceSessionId = generateSessionId();
const runSlug = options.reportDirName ?? generateReportDir(task);
if (!isValidReportDirName(runSlug)) throw new Error(`Invalid reportDirName: ${runSlug}`);
@ -61,10 +62,7 @@ export async function executePiece(
displayRef.current.createHandler()(event);
};
const isWorktree = cwd !== projectCwd;
const globalConfig = resolvePieceConfigValues(
projectCwd,
['notificationSound', 'notificationSoundEvents', 'provider', 'runtime', 'preventSleep', 'model', 'logging', 'analytics'],
);
const globalConfig = resolvePieceConfigValues(projectCwd, ['notificationSound', 'notificationSoundEvents', 'provider', 'runtime', 'preventSleep', 'model', 'logging', 'analytics']);
const traceReportMode = globalConfig.logging?.trace === true ? 'full' : 'redacted';
const allowSensitiveData = traceReportMode === 'full';
const ndjsonLogPath = initNdjsonLog(
@ -74,9 +72,7 @@ export async function executePiece(
{ logsDir: runPaths.logsAbs },
);
const sessionLogger = new SessionLogger(ndjsonLogPath, allowSensitiveData);
if (options.interactiveMetadata) {
sessionLogger.writeInteractiveMetadata(options.interactiveMetadata);
}
if (options.interactiveMetadata) sessionLogger.writeInteractiveMetadata(options.interactiveMetadata);
const shouldNotify = globalConfig.notificationSound !== false;
const nse = globalConfig.notificationSoundEvents;
const shouldNotifyIterationLimit = shouldNotify && nse?.iterationLimit !== false;
@ -98,6 +94,16 @@ export async function executePiece(
movement: options.startMovement ?? pieceConfig.initialMovement,
enabled: isProviderEventsEnabled(globalConfig),
});
const usageEventLogger = createUsageEventLogger({
logsDir: runPaths.logsAbs,
sessionId: pieceSessionId,
runId: runSlug,
provider: currentProvider,
providerModel: configuredModel ?? '(default)',
movement: options.startMovement ?? pieceConfig.initialMovement,
movementType: 'normal',
enabled: isUsageEventsEnabled(globalConfig),
});
initAnalyticsWriter(globalConfig.analytics?.enabled === true, globalConfig.analytics?.eventsPath ?? join(getGlobalConfigDir(), 'analytics', 'events'));
if (globalConfig.preventSleep) preventSleep();
const analyticsEmitter = new AnalyticsEmitter(runSlug, currentProvider, configuredModel ?? '(default)');
@ -107,7 +113,6 @@ export async function executePiece(
const sessionUpdateHandler = isWorktree
? (personaName: string, personaSessionId: string) => updateWorktreeSession(projectCwd, cwd, personaName, personaSessionId, currentProvider)
: (persona: string, personaSessionId: string) => updatePersonaSession(projectCwd, persona, personaSessionId, currentProvider);
const iterationLimitHandler = createIterationLimitHandler(
out,
displayRef,
@ -121,7 +126,6 @@ export async function executePiece(
},
);
const onUserInput = interactiveUserInput ? createUserInputHandler(out, displayRef) : undefined;
let abortReason: string | undefined;
let exceededInfo: ExceededInfo | undefined;
let lastMovementContent: string | undefined;
@ -174,13 +178,11 @@ export async function executePiece(
log.debug('Phase starting', { step: step.name, phase, phaseName });
sessionLogger.onPhaseStart(step, phase, phaseName, instruction, promptParts, phaseExecutionId, iteration);
});
engine.on('phase:complete', (step, phase, phaseName, content, phaseStatus, phaseError, phaseExecutionId, iteration) => {
log.debug('Phase completed', { step: step.name, phase, phaseName, status: phaseStatus });
sessionLogger.setIteration(currentIteration);
sessionLogger.onPhaseComplete(step, phase, phaseName, content, phaseStatus, phaseError, phaseExecutionId, iteration);
});
engine.on('phase:judge_stage', (step, phase, phaseName, entry, phaseExecutionId, iteration) => {
sessionLogger.onJudgeStage(step, phase, phaseName, entry, phaseExecutionId, iteration);
});
@ -195,6 +197,8 @@ export async function executePiece(
const movementModel = providerInfo.model ?? (movementProvider === currentProvider ? configuredModel : undefined) ?? '(default)';
providerEventLogger.setMovement(step.name);
providerEventLogger.setProvider(movementProvider);
usageEventLogger.setMovement(step.name, detectMovementType(step));
usageEventLogger.setProvider(movementProvider, movementModel);
out.info(`Provider: ${movementProvider}`);
out.info(`Model: ${movementModel}`);
if (instruction) log.debug('Step instruction', instruction);
@ -210,7 +214,6 @@ export async function executePiece(
}
sessionLogger.onMovementStart(step, iteration, instruction);
});
engine.on('movement:complete', (step, response, instruction) => {
log.debug('Movement completed', { step: step.name, status: response.status, matchedRuleIndex: response.matchedRuleIndex, matchedRuleMethod: response.matchedRuleMethod, contentLength: response.content.length, sessionId: response.sessionId, error: response.error });
lastMovementContent = response.content;
@ -227,17 +230,16 @@ export async function executePiece(
}
if (response.error) out.error(`Error: ${response.error}`);
if (response.sessionId) out.status('Session', response.sessionId);
usageEventLogger.logUsage({ success: response.status === 'done', usage: response.providerUsage ?? { usageMissing: true, reason: USAGE_MISSING_REASONS.NOT_AVAILABLE } });
sessionLogger.onMovementComplete(step, response, instruction);
analyticsEmitter.onMovementComplete(step, response);
sessionLog = { ...sessionLog, iterations: sessionLog.iterations + 1 };
});
engine.on('movement:report', (step, filePath, fileName) => {
out.logLine(`\n📄 Report: ${fileName}\n`);
out.logLine(readFileSync(filePath, 'utf-8'));
analyticsEmitter.onMovementReport(step, filePath);
});
engine.on('piece:complete', (state) => {
log.info('Piece completed successfully', { iterations: state.iteration });
sessionLog = finalizeSessionLog(sessionLog, 'completed');
@ -256,7 +258,6 @@ export async function executePiece(
out.info(`Session log: ${ndjsonLogPath}`);
if (shouldNotifyPieceComplete) notifySuccess('TAKT', getLabel('piece.notifyComplete', undefined, { iteration: String(state.iteration) }));
});
engine.on('piece:abort', (state, reason) => {
interruptAllQueries();
log.error('Piece aborted', { reason, iterations: state.iteration });
@ -280,7 +281,6 @@ export async function executePiece(
out.info(`Session log: ${ndjsonLogPath}`);
if (shouldNotifyPieceAbort) notifyError('TAKT', getLabel('piece.notifyAbort', undefined, { reason }));
});
const finalState = await engine.run();
return {
success: finalState.status === 'completed',

View File

@ -18,3 +18,10 @@ export function formatElapsedTime(startTime: string, endTime: string): string {
}
return `${Math.floor(elapsedSec / 60)}m ${Math.floor(elapsedSec % 60)}s`;
}
export function detectMovementType(step: { parallel?: unknown; arpeggio?: unknown; teamLeader?: unknown }): 'normal' | 'parallel' | 'arpeggio' | 'team_leader' {
if (step.parallel) return 'parallel';
if (step.arpeggio) return 'arpeggio';
if (step.teamLeader) return 'team_leader';
return 'normal';
}

View File

@ -79,6 +79,7 @@ export class ClaudeClient {
sessionId: result.sessionId,
error: result.error,
structuredOutput: result.structuredOutput,
providerUsage: result.providerUsage,
};
}
@ -108,6 +109,7 @@ export class ClaudeClient {
sessionId: result.sessionId,
error: result.error,
structuredOutput: result.structuredOutput,
providerUsage: result.providerUsage,
};
}
@ -158,6 +160,7 @@ export class ClaudeClient {
sessionId: result.sessionId,
error: result.error,
structuredOutput: result.structuredOutput,
providerUsage: result.providerUsage,
};
}

View File

@ -11,6 +11,8 @@ import {
type SDKResultMessage,
type SDKAssistantMessage,
} from '@anthropic-ai/claude-agent-sdk';
import { USAGE_MISSING_REASONS } from '../../core/logging/contracts.js';
import type { ProviderUsageSnapshot } from '../../core/models/response.js';
import { createLogger, getErrorMessage } from '../../shared/utils/index.js';
import {
generateQueryId,
@ -26,6 +28,55 @@ import type {
const log = createLogger('claude-sdk');
function toNumber(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
function extractProviderUsage(resultMsg: SDKResultMessage): ProviderUsageSnapshot {
const rawUsage = (resultMsg as unknown as { usage?: unknown }).usage;
if (!rawUsage || typeof rawUsage !== 'object') {
return {
usageMissing: true,
reason: USAGE_MISSING_REASONS.NOT_AVAILABLE,
};
}
const usage = rawUsage as Record<string, unknown>;
const inputTokens = toNumber(usage.input_tokens);
const outputTokens = toNumber(usage.output_tokens);
const cacheCreationInputTokens = toNumber(usage.cache_creation_input_tokens);
const cacheReadInputTokens = toNumber(usage.cache_read_input_tokens);
if (inputTokens === undefined || outputTokens === undefined) {
return {
usageMissing: true,
reason: USAGE_MISSING_REASONS.TOKENS_MISSING,
};
}
const totalTokens = inputTokens + outputTokens;
const cachedInputTokens = (
cacheCreationInputTokens !== undefined && cacheReadInputTokens !== undefined
? cacheCreationInputTokens + cacheReadInputTokens
: cacheReadInputTokens ?? cacheCreationInputTokens
);
const providerUsage: ProviderUsageSnapshot = {
inputTokens,
outputTokens,
totalTokens,
usageMissing: false,
};
if (cachedInputTokens !== undefined) {
providerUsage.cachedInputTokens = cachedInputTokens;
}
if (cacheCreationInputTokens !== undefined) {
providerUsage.cacheCreationInputTokens = cacheCreationInputTokens;
}
if (cacheReadInputTokens !== undefined) {
providerUsage.cacheReadInputTokens = cacheReadInputTokens;
}
return providerUsage;
}
/**
* Executes Claude queries using the Agent SDK.
*
@ -95,6 +146,7 @@ export class QueryExecutor {
let hasResultMessage = false;
let accumulatedAssistantText = '';
let structuredOutput: Record<string, unknown> | undefined;
let providerUsage: ProviderUsageSnapshot | undefined;
let onExternalAbort: (() => void) | undefined;
try {
@ -138,6 +190,7 @@ export class QueryExecutor {
if (message.type === 'result') {
hasResultMessage = true;
const resultMsg = message as SDKResultMessage;
providerUsage = extractProviderUsage(resultMsg);
if (resultMsg.subtype === 'success') {
resultContent = resultMsg.result;
const rawStructuredOutput = (resultMsg as unknown as {
@ -176,13 +229,18 @@ export class QueryExecutor {
hasResultMessage,
});
return {
const response: ClaudeResult = {
success,
content: finalContent.trim(),
sessionId,
fullContent: accumulatedAssistantText.trim(),
structuredOutput,
providerUsage: providerUsage ?? {
usageMissing: true,
reason: USAGE_MISSING_REASONS.NOT_AVAILABLE,
},
};
return response;
} catch (error) {
if (onExternalAbort && options.abortSignal) {
options.abortSignal.removeEventListener('abort', onExternalAbort);

View File

@ -7,6 +7,7 @@
import type { PermissionUpdate, AgentDefinition, SandboxSettings } from '@anthropic-ai/claude-agent-sdk';
import type { PermissionMode, McpServerConfig } from '../../core/models/index.js';
import type { ProviderUsageSnapshot } from '../../core/models/response.js';
export type { SandboxSettings };
import type { PermissionResult } from '../../core/piece/index.js';
@ -113,6 +114,8 @@ export interface ClaudeResult {
fullContent?: string;
/** Structured output returned by Claude SDK */
structuredOutput?: Record<string, unknown>;
/** Provider-native usage payload normalized for TAKT observability */
providerUsage?: ProviderUsageSnapshot;
}
/** Extended result with query ID for concurrent execution */

View File

@ -5,7 +5,8 @@
*/
import { Codex, type TurnOptions } from '@openai/codex-sdk';
import type { AgentResponse } from '../../core/models/index.js';
import { USAGE_MISSING_REASONS } from '../../core/logging/contracts.js';
import type { AgentResponse, ProviderUsageSnapshot } from '../../core/models/index.js';
import { createLogger, getErrorMessage, createStreamDiagnostics, parseStructuredOutput, type StreamDiagnostics } from '../../shared/utils/index.js';
import { mapToCodexSandboxMode, type CodexCallOptions } from './types.js';
import {
@ -38,6 +39,49 @@ const CODEX_RETRYABLE_ERROR_PATTERNS = [
'fetch failed',
];
function toNumber(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
function extractProviderUsageFromTurnCompleted(event: CodexEvent): ProviderUsageSnapshot {
const usageRaw = event.usage;
if (!usageRaw || typeof usageRaw !== 'object') {
return {
usageMissing: true,
reason: USAGE_MISSING_REASONS.NOT_AVAILABLE,
};
}
const usage = usageRaw as Record<string, unknown>;
const inputTokens = toNumber(usage.input_tokens);
const outputTokens = toNumber(usage.output_tokens);
const explicitTotal = toNumber(usage.total_tokens);
const totalTokens = explicitTotal ?? (
inputTokens !== undefined && outputTokens !== undefined
? inputTokens + outputTokens
: undefined
);
const cachedInputTokens = toNumber(usage.cached_input_tokens);
if (inputTokens === undefined || outputTokens === undefined || totalTokens === undefined) {
return {
usageMissing: true,
reason: USAGE_MISSING_REASONS.TOKENS_MISSING,
};
}
const providerUsage: ProviderUsageSnapshot = {
inputTokens,
outputTokens,
totalTokens,
usageMissing: false,
};
if (cachedInputTokens !== undefined) {
providerUsage.cachedInputTokens = cachedInputTokens;
}
return providerUsage;
}
/**
* Client for Codex SDK agent interactions.
*
@ -167,6 +211,7 @@ export class CodexClient {
const contentOffsets = new Map<string, number>();
let success = true;
let failureMessage = '';
let providerUsage: ProviderUsageSnapshot | undefined;
const state = createStreamTrackingState();
for await (const event of events as AsyncGenerator<CodexEvent>) {
@ -180,6 +225,11 @@ export class CodexClient {
continue;
}
if (event.type === 'turn.completed') {
providerUsage = extractProviderUsageFromTurnCompleted(event);
continue;
}
if (event.type === 'turn.failed') {
success = false;
if (event.error && typeof event.error === 'object' && 'message' in event.error) {
@ -280,14 +330,19 @@ export class CodexClient {
const structuredOutput = parseStructuredOutput(trimmed, !!options.outputSchema);
emitResult(options.onStream, true, trimmed, currentThreadId);
return {
const response: AgentResponse = {
persona: agentType,
status: 'done',
content: trimmed,
timestamp: new Date(),
sessionId: currentThreadId,
structuredOutput,
providerUsage: providerUsage ?? {
usageMissing: true,
reason: USAGE_MISSING_REASONS.NOT_AVAILABLE,
},
};
return response;
} catch (error) {
const message = getErrorMessage(error);
const errorMessage = streamAbortController.signal.aborted

View File

@ -97,6 +97,7 @@ export class GlobalConfigManager {
trace: parsed.logging.trace,
debug: parsed.logging.debug,
providerEvents: parsed.logging.provider_events,
usageEvents: parsed.logging.usage_events,
} : undefined,
analytics: parsed.analytics ? {
enabled: parsed.analytics.enabled,

View File

@ -18,12 +18,14 @@ export function serializeGlobalConfig(config: PersistedGlobalConfig): Record<str
|| config.logging.trace !== undefined
|| config.logging.debug !== undefined
|| config.logging.providerEvents !== undefined
|| config.logging.usageEvents !== undefined
)) {
raw.logging = {
...(config.logging.level !== undefined ? { level: config.logging.level } : {}),
...(config.logging.trace !== undefined ? { trace: config.logging.trace } : {}),
...(config.logging.debug !== undefined ? { debug: config.logging.debug } : {}),
...(config.logging.providerEvents !== undefined ? { provider_events: config.logging.providerEvents } : {}),
...(config.logging.usageEvents !== undefined ? { usage_events: config.logging.usageEvents } : {}),
};
}
if (config.analytics) {

View File

@ -1,143 +1,8 @@
import { appendFileSync } from 'node:fs';
import { join } from 'node:path';
import type { ProviderType, StreamCallback, StreamEvent } from '../types/provider.js';
export interface ProviderEventLoggerConfig {
logsDir: string;
sessionId: string;
runId: string;
provider: ProviderType;
movement: string;
enabled: boolean;
}
export interface ProviderEventLogger {
readonly filepath: string;
setMovement(movement: string): void;
setProvider(provider: ProviderType): void;
wrapCallback(original?: StreamCallback): StreamCallback;
}
interface ProviderEventLogRecord {
timestamp: string;
provider: ProviderType;
event_type: string;
run_id: string;
movement: string;
session_id?: string;
message_id?: string;
call_id?: string;
request_id?: string;
data: Record<string, unknown>;
}
const MAX_TEXT_LENGTH = 10_000;
const HEAD_LENGTH = 5_000;
const TAIL_LENGTH = 2_000;
const TRUNCATED_MARKER = '...[truncated]';
function truncateString(value: string): string {
if (value.length <= MAX_TEXT_LENGTH) {
return value;
}
return value.slice(0, HEAD_LENGTH) + TRUNCATED_MARKER + value.slice(-TAIL_LENGTH);
}
function sanitizeData(data: Record<string, unknown>): Record<string, unknown> {
return Object.fromEntries(
Object.entries(data).map(([key, value]) => {
if (typeof value === 'string') {
return [key, truncateString(value)];
}
return [key, value];
})
);
}
function pickString(source: Record<string, unknown>, keys: string[]): string | undefined {
for (const key of keys) {
const value = source[key];
if (typeof value === 'string' && value.length > 0) {
return value;
}
}
return undefined;
}
function buildLogRecord(
event: StreamEvent,
provider: ProviderType,
movement: string,
runId: string,
): ProviderEventLogRecord {
const data = sanitizeData(event.data as unknown as Record<string, unknown>);
const sessionId = pickString(data, ['session_id', 'sessionId', 'sessionID', 'thread_id', 'threadId']);
const messageId = pickString(data, ['message_id', 'messageId', 'item_id', 'itemId']);
const callId = pickString(data, ['call_id', 'callId', 'id']);
const requestId = pickString(data, ['request_id', 'requestId']);
return {
timestamp: new Date().toISOString(),
provider,
event_type: event.type,
run_id: runId,
movement,
...(sessionId ? { session_id: sessionId } : {}),
...(messageId ? { message_id: messageId } : {}),
...(callId ? { call_id: callId } : {}),
...(requestId ? { request_id: requestId } : {}),
data,
};
}
export function createProviderEventLogger(config: ProviderEventLoggerConfig): ProviderEventLogger {
const filepath = join(config.logsDir, `${config.sessionId}-provider-events.jsonl`);
let movement = config.movement;
let provider = config.provider;
let hasReportedWriteFailure = false;
const write = (event: StreamEvent): void => {
try {
const record = buildLogRecord(event, provider, movement, config.runId);
appendFileSync(filepath, JSON.stringify(record) + '\n', 'utf-8');
} catch (error) {
if (hasReportedWriteFailure) {
return;
}
hasReportedWriteFailure = true;
const message = error instanceof Error ? error.message : String(error);
process.stderr.write(`[takt] Failed to write provider event log: ${message}\n`);
}
};
return {
filepath,
setMovement(nextMovement: string): void {
movement = nextMovement;
},
setProvider(nextProvider: ProviderType): void {
provider = nextProvider;
},
wrapCallback(original?: StreamCallback): StreamCallback {
if (!config.enabled && original) {
return original;
}
if (!config.enabled) {
return () => {};
}
return (event: StreamEvent): void => {
write(event);
original?.(event);
};
},
};
}
export function isProviderEventsEnabled(config?: {
logging?: {
providerEvents?: boolean;
};
}): boolean {
return config?.logging?.providerEvents === true;
}
export {
createProviderEventLogger,
isProviderEventsEnabled,
} from '../../core/logging/providerEventLogger.js';
export type {
ProviderEventLogger,
ProviderEventLoggerConfig,
} from '../../core/logging/providerEventLogger.js';

View File

@ -0,0 +1,8 @@
export {
createUsageEventLogger,
isUsageEventsEnabled,
} from '../../core/logging/usageEventLogger.js';
export type {
UsageEventLogger,
UsageEventLoggerConfig,
} from '../../core/logging/usageEventLogger.js';