github-issue-244 (#250)

* dist-tag 検証をリトライ付きに変更(npm レジストリの結果整合性対策)

* takt run 実行時に蓋閉じスリープを抑制

* takt: github-issue-244

* takt: #244/implement-parallel-subtasks
This commit is contained in:
nrs 2026-02-12 11:51:34 +09:00 committed by GitHub
parent 41bde30adc
commit a82d6d9d8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 950 additions and 5 deletions

View File

@ -0,0 +1,68 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { buildAbortSignal } from '../core/piece/engine/abort-signal.js';
describe('buildAbortSignal', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});
it('タイムアウトでabortされる', () => {
const { signal, dispose } = buildAbortSignal(100, undefined);
expect(signal.aborted).toBe(false);
vi.advanceTimersByTime(100);
expect(signal.aborted).toBe(true);
expect(signal.reason).toBeInstanceOf(Error);
expect((signal.reason as Error).message).toBe('Part timeout after 100ms');
dispose();
});
it('親シグナルがabortされると子シグナルへ伝搬する', () => {
const parent = new AbortController();
const { signal, dispose } = buildAbortSignal(1000, parent.signal);
const reason = new Error('parent aborted');
parent.abort(reason);
expect(signal.aborted).toBe(true);
expect(signal.reason).toBe(reason);
dispose();
});
it('disposeでタイマーと親リスナーを解放する', () => {
const parent = new AbortController();
const addSpy = vi.spyOn(parent.signal, 'addEventListener');
const removeSpy = vi.spyOn(parent.signal, 'removeEventListener');
const { signal, dispose } = buildAbortSignal(100, parent.signal);
expect(addSpy).toHaveBeenCalledTimes(1);
dispose();
vi.advanceTimersByTime(200);
expect(signal.aborted).toBe(false);
expect(removeSpy).toHaveBeenCalledTimes(1);
});
it('親シグナルが既にabort済みなら即時伝搬する', () => {
const parent = new AbortController();
const reason = new Error('already aborted');
const addSpy = vi.spyOn(parent.signal, 'addEventListener');
parent.abort(reason);
const { signal, dispose } = buildAbortSignal(1000, parent.signal);
expect(signal.aborted).toBe(true);
expect(signal.reason).toBe(reason);
expect(addSpy).not.toHaveBeenCalled();
dispose();
});
});

View File

@ -0,0 +1,172 @@
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { existsSync, rmSync } from 'node:fs';
import { runAgent } from '../agents/runner.js';
import { detectMatchedRule } from '../core/piece/evaluation/index.js';
import { PieceEngine } from '../core/piece/engine/PieceEngine.js';
import { makeMovement, makeRule, makeResponse, createTestTmpDir, applyDefaultMocks } from './engine-test-helpers.js';
import type { PieceConfig } from '../core/models/index.js';
vi.mock('../agents/runner.js', () => ({
runAgent: vi.fn(),
}));
vi.mock('../core/piece/evaluation/index.js', () => ({
detectMatchedRule: vi.fn(),
}));
vi.mock('../core/piece/phase-runner.js', () => ({
needsStatusJudgmentPhase: vi.fn().mockReturnValue(false),
runReportPhase: vi.fn().mockResolvedValue(undefined),
runStatusJudgmentPhase: vi.fn().mockResolvedValue(''),
}));
vi.mock('../shared/utils/index.js', async (importOriginal) => ({
...(await importOriginal<Record<string, unknown>>()),
generateReportDir: vi.fn().mockReturnValue('test-report-dir'),
}));
function buildTeamLeaderConfig(): PieceConfig {
return {
name: 'team-leader-piece',
initialMovement: 'implement',
maxMovements: 5,
movements: [
makeMovement('implement', {
instructionTemplate: 'Task: {task}',
teamLeader: {
persona: '../personas/team-leader.md',
maxParts: 3,
timeoutMs: 10000,
partPersona: '../personas/coder.md',
partAllowedTools: ['Read', 'Edit', 'Write'],
partEdit: true,
partPermissionMode: 'edit',
},
rules: [makeRule('done', 'COMPLETE')],
}),
],
};
}
describe('PieceEngine Integration: TeamLeaderRunner', () => {
let tmpDir: string;
beforeEach(() => {
vi.resetAllMocks();
applyDefaultMocks();
tmpDir = createTestTmpDir();
});
afterEach(() => {
if (existsSync(tmpDir)) {
rmSync(tmpDir, { recursive: true, force: true });
}
});
it('team leaderが分解したパートを並列実行し集約する', async () => {
const config = buildTeamLeaderConfig();
const engine = new PieceEngine(config, tmpDir, 'implement feature', { projectCwd: tmpDir });
vi.mocked(runAgent)
.mockResolvedValueOnce(makeResponse({
persona: 'team-leader',
content: [
'```json',
'[{"id":"part-1","title":"API","instruction":"Implement API"},{"id":"part-2","title":"Test","instruction":"Add tests"}]',
'```',
].join('\n'),
}))
.mockResolvedValueOnce(makeResponse({ persona: 'coder', content: 'API done' }))
.mockResolvedValueOnce(makeResponse({ persona: 'coder', content: 'Tests done' }));
vi.mocked(detectMatchedRule).mockResolvedValueOnce({ index: 0, method: 'phase1_tag' });
const state = await engine.run();
expect(state.status).toBe('completed');
expect(vi.mocked(runAgent)).toHaveBeenCalledTimes(3);
const output = state.movementOutputs.get('implement');
expect(output).toBeDefined();
expect(output!.content).toContain('## decomposition');
expect(output!.content).toContain('## part-1: API');
expect(output!.content).toContain('API done');
expect(output!.content).toContain('## part-2: Test');
expect(output!.content).toContain('Tests done');
});
it('全パートが失敗した場合はムーブメント失敗として中断する', async () => {
const config = buildTeamLeaderConfig();
const engine = new PieceEngine(config, tmpDir, 'implement feature', { projectCwd: tmpDir });
vi.mocked(runAgent)
.mockResolvedValueOnce(makeResponse({
persona: 'team-leader',
content: [
'```json',
'[{"id":"part-1","title":"API","instruction":"Implement API"},{"id":"part-2","title":"Test","instruction":"Add tests"}]',
'```',
].join('\n'),
}))
.mockResolvedValueOnce(makeResponse({ persona: 'coder', status: 'error', error: 'api failed' }))
.mockResolvedValueOnce(makeResponse({ persona: 'coder', status: 'error', error: 'test failed' }));
const state = await engine.run();
expect(state.status).toBe('aborted');
});
it('一部パートが失敗しても成功パートがあれば集約結果は完了する', async () => {
const config = buildTeamLeaderConfig();
const engine = new PieceEngine(config, tmpDir, 'implement feature', { projectCwd: tmpDir });
vi.mocked(runAgent)
.mockResolvedValueOnce(makeResponse({
persona: 'team-leader',
content: [
'```json',
'[{"id":"part-1","title":"API","instruction":"Implement API"},{"id":"part-2","title":"Test","instruction":"Add tests"}]',
'```',
].join('\n'),
}))
.mockResolvedValueOnce(makeResponse({ persona: 'coder', content: 'API done' }))
.mockResolvedValueOnce(makeResponse({ persona: 'coder', status: 'error', error: 'test failed' }));
vi.mocked(detectMatchedRule).mockResolvedValueOnce({ index: 0, method: 'phase1_tag' });
const state = await engine.run();
expect(state.status).toBe('completed');
const output = state.movementOutputs.get('implement');
expect(output).toBeDefined();
expect(output!.content).toContain('## part-1: API');
expect(output!.content).toContain('API done');
expect(output!.content).toContain('## part-2: Test');
expect(output!.content).toContain('[ERROR] test failed');
});
it('パート失敗時にerrorがなくてもcontentの詳細をエラー表示に使う', async () => {
const config = buildTeamLeaderConfig();
const engine = new PieceEngine(config, tmpDir, 'implement feature', { projectCwd: tmpDir });
vi.mocked(runAgent)
.mockResolvedValueOnce(makeResponse({
persona: 'team-leader',
content: [
'```json',
'[{"id":"part-1","title":"API","instruction":"Implement API"},{"id":"part-2","title":"Test","instruction":"Add tests"}]',
'```',
].join('\n'),
}))
.mockResolvedValueOnce(makeResponse({ persona: 'coder', status: 'error', content: 'api failed from content' }))
.mockResolvedValueOnce(makeResponse({ persona: 'coder', content: 'Tests done' }));
vi.mocked(detectMatchedRule).mockResolvedValueOnce({ index: 0, method: 'phase1_tag' });
const state = await engine.run();
expect(state.status).toBe('completed');
const output = state.movementOutputs.get('implement');
expect(output).toBeDefined();
expect(output!.content).toContain('[ERROR] api failed from content');
});
});

View File

@ -0,0 +1,76 @@
import { describe, it, expect } from 'vitest';
import { parseParts } from '../core/piece/engine/task-decomposer.js';
describe('parseParts', () => {
it('最後のjsonコードブロックをパースする', () => {
const content = [
'説明',
'```json',
'[{"id":"old","title":"old","instruction":"old"}]',
'```',
'最終案',
'```json',
'[{"id":"a","title":"A","instruction":"Do A"},{"id":"b","title":"B","instruction":"Do B","timeout_ms":1200}]',
'```',
].join('\n');
const result = parseParts(content, 3);
expect(result).toHaveLength(2);
expect(result[0]).toEqual({
id: 'a',
title: 'A',
instruction: 'Do A',
timeoutMs: undefined,
});
expect(result[1]!.timeoutMs).toBe(1200);
});
it('jsonコードブロックがない場合はエラー', () => {
expect(() => parseParts('no json', 3)).toThrow(
'Team leader output must include a ```json ... ``` block',
);
});
it('max_partsを超えたらエラー', () => {
const content = '```json\n[{"id":"a","title":"A","instruction":"Do A"},{"id":"b","title":"B","instruction":"Do B"}]\n```';
expect(() => parseParts(content, 1)).toThrow(
'Team leader produced too many parts: 2 > 1',
);
});
it('必須フィールドが不足したらエラー', () => {
const content = '```json\n[{"id":"a","title":"A"}]\n```';
expect(() => parseParts(content, 3)).toThrow(
'Part[0] "instruction" must be a non-empty string',
);
});
it('jsonコードブロックが配列でない場合はエラー', () => {
const content = '```json\n{"not":"array"}\n```';
expect(() => parseParts(content, 3)).toThrow(
'Team leader JSON must be an array',
);
});
it('空配列の場合はエラー', () => {
const content = '```json\n[]\n```';
expect(() => parseParts(content, 3)).toThrow(
'Team leader JSON must contain at least one part',
);
});
it('重複したpart idがある場合はエラー', () => {
const content = [
'```json',
'[{"id":"dup","title":"A","instruction":"Do A"},{"id":"dup","title":"B","instruction":"Do B"}]',
'```',
].join('\n');
expect(() => parseParts(content, 3)).toThrow('Duplicate part id: dup');
});
});

View File

@ -0,0 +1,105 @@
import { describe, it, expect } from 'vitest';
import { join } from 'node:path';
import { PieceMovementRawSchema } from '../core/models/schemas.js';
import { normalizePieceConfig } from '../infra/config/loaders/pieceParser.js';
describe('team_leader schema', () => {
it('max_parts <= 3 の設定を受け付ける', () => {
const raw = {
name: 'implement',
team_leader: {
persona: 'team-leader',
max_parts: 3,
timeout_ms: 120000,
},
instruction_template: 'decompose',
};
const result = PieceMovementRawSchema.safeParse(raw);
expect(result.success).toBe(true);
});
it('max_parts > 3 は拒否する', () => {
const raw = {
name: 'implement',
team_leader: {
max_parts: 4,
},
instruction_template: 'decompose',
};
const result = PieceMovementRawSchema.safeParse(raw);
expect(result.success).toBe(false);
});
it('parallel と team_leader の同時指定は拒否する', () => {
const raw = {
name: 'implement',
parallel: [{ name: 'sub', instruction_template: 'x' }],
team_leader: {
max_parts: 2,
},
instruction_template: 'decompose',
};
const result = PieceMovementRawSchema.safeParse(raw);
expect(result.success).toBe(false);
});
it('arpeggio と team_leader の同時指定は拒否する', () => {
const raw = {
name: 'implement',
arpeggio: {
source: 'csv',
source_path: './data.csv',
template: './prompt.md',
},
team_leader: {
max_parts: 2,
},
instruction_template: 'decompose',
};
const result = PieceMovementRawSchema.safeParse(raw);
expect(result.success).toBe(false);
});
});
describe('normalizePieceConfig team_leader', () => {
it('team_leader を内部形式へ正規化する', () => {
const pieceDir = join(process.cwd(), 'src', '__tests__');
const raw = {
name: 'piece',
movements: [
{
name: 'implement',
team_leader: {
persona: 'team-leader',
max_parts: 2,
timeout_ms: 90000,
part_persona: 'coder',
part_allowed_tools: ['Read', 'Edit'],
part_edit: true,
part_permission_mode: 'edit',
},
instruction_template: 'decompose',
},
],
};
const config = normalizePieceConfig(raw, pieceDir);
const movement = config.movements[0];
expect(movement).toBeDefined();
expect(movement!.teamLeader).toEqual({
persona: 'team-leader',
personaPath: undefined,
maxParts: 2,
timeoutMs: 90000,
partPersona: 'coder',
partPersonaPath: undefined,
partAllowedTools: ['Read', 'Edit'],
partEdit: true,
partPermissionMode: 'edit',
});
});
});

View File

@ -10,6 +10,9 @@ export type {
McpServerConfig, McpServerConfig,
AgentResponse, AgentResponse,
SessionState, SessionState,
PartDefinition,
PartResult,
TeamLeaderConfig,
PieceRule, PieceRule,
PieceMovement, PieceMovement,
ArpeggioMovementConfig, ArpeggioMovementConfig,

42
src/core/models/part.ts Normal file
View File

@ -0,0 +1,42 @@
import type { PermissionMode } from './status.js';
import type { AgentResponse } from './response.js';
/** Part definition produced by movement team leader agent */
export interface PartDefinition {
/** Unique ID inside the parent movement */
id: string;
/** Human-readable title */
title: string;
/** Instruction passed to the part agent */
instruction: string;
/** Optional per-part timeout in milliseconds */
timeoutMs?: number;
}
/** Result of a single part execution */
export interface PartResult {
part: PartDefinition;
response: AgentResponse;
}
/** team_leader config on a movement */
export interface TeamLeaderConfig {
/** Persona reference for the team leader agent */
persona?: string;
/** Resolved absolute path for team leader persona */
personaPath?: string;
/** Maximum number of parts to run in parallel */
maxParts: number;
/** Default timeout for parts in milliseconds */
timeoutMs: number;
/** Persona reference for part agents */
partPersona?: string;
/** Resolved absolute path for part persona */
partPersonaPath?: string;
/** Allowed tools for part agents */
partAllowedTools?: string[];
/** Whether part agents can edit files */
partEdit?: boolean;
/** Permission mode for part agents */
partPermissionMode?: PermissionMode;
}

View File

@ -5,6 +5,7 @@
import type { PermissionMode } from './status.js'; import type { PermissionMode } from './status.js';
import type { AgentResponse } from './response.js'; import type { AgentResponse } from './response.js';
import type { InteractiveMode } from './interactive-mode.js'; import type { InteractiveMode } from './interactive-mode.js';
import type { TeamLeaderConfig } from './part.js';
/** Rule-based transition configuration (unified format) */ /** Rule-based transition configuration (unified format) */
export interface PieceRule { export interface PieceRule {
@ -116,6 +117,8 @@ export interface PieceMovement {
parallel?: PieceMovement[]; parallel?: PieceMovement[];
/** Arpeggio configuration for data-driven batch processing. When set, this movement reads from a data source, expands templates, and calls LLM per batch. */ /** Arpeggio configuration for data-driven batch processing. When set, this movement reads from a data source, expands templates, and calls LLM per batch. */
arpeggio?: ArpeggioMovementConfig; arpeggio?: ArpeggioMovementConfig;
/** Team leader configuration for dynamic part decomposition + parallel execution */
teamLeader?: TeamLeaderConfig;
/** Resolved policy content strings (from piece-level policies map, resolved at parse time) */ /** Resolved policy content strings (from piece-level policies map, resolved at parse time) */
policyContents?: string[]; policyContents?: string[];
/** Resolved knowledge content strings (from piece-level knowledge map, resolved at parse time) */ /** Resolved knowledge content strings (from piece-level knowledge map, resolved at parse time) */

View File

@ -170,6 +170,24 @@ export const ArpeggioConfigRawSchema = z.object({
output_path: z.string().optional(), output_path: z.string().optional(),
}); });
/** Team leader configuration schema for dynamic part decomposition */
export const TeamLeaderConfigRawSchema = z.object({
/** Persona reference for team leader agent */
persona: z.string().optional(),
/** Maximum number of parts (must be <= 3) */
max_parts: z.number().int().positive().max(3).optional().default(3),
/** Default timeout per part in milliseconds */
timeout_ms: z.number().int().positive().optional().default(600000),
/** Persona reference for part agents */
part_persona: z.string().optional(),
/** Allowed tools for part agents */
part_allowed_tools: z.array(z.string()).optional(),
/** Whether part agents can edit files */
part_edit: z.boolean().optional(),
/** Permission mode for part agents */
part_permission_mode: PermissionModeSchema.optional(),
});
/** Sub-movement schema for parallel execution */ /** Sub-movement schema for parallel execution */
export const ParallelSubMovementRawSchema = z.object({ export const ParallelSubMovementRawSchema = z.object({
name: z.string().min(1), name: z.string().min(1),
@ -232,7 +250,15 @@ export const PieceMovementRawSchema = z.object({
parallel: z.array(ParallelSubMovementRawSchema).optional(), parallel: z.array(ParallelSubMovementRawSchema).optional(),
/** Arpeggio configuration for data-driven batch processing */ /** Arpeggio configuration for data-driven batch processing */
arpeggio: ArpeggioConfigRawSchema.optional(), arpeggio: ArpeggioConfigRawSchema.optional(),
}); /** Team leader configuration for dynamic part decomposition */
team_leader: TeamLeaderConfigRawSchema.optional(),
}).refine(
(data) => [data.parallel, data.arpeggio, data.team_leader].filter((v) => v != null).length <= 1,
{
message: "'parallel', 'arpeggio', and 'team_leader' are mutually exclusive",
path: ['parallel'],
},
);
/** Loop monitor rule schema */ /** Loop monitor rule schema */
export const LoopMonitorRuleSchema = z.object({ export const LoopMonitorRuleSchema = z.object({

View File

@ -23,6 +23,13 @@ export type {
SessionState, SessionState,
} from './session.js'; } from './session.js';
// Part decomposition
export type {
PartDefinition,
PartResult,
TeamLeaderConfig,
} from './part.js';
// Piece configuration and runtime state // Piece configuration and runtime state
export type { export type {
PieceRule, PieceRule,

View File

@ -31,6 +31,7 @@ import { OptionsBuilder } from './OptionsBuilder.js';
import { MovementExecutor } from './MovementExecutor.js'; import { MovementExecutor } from './MovementExecutor.js';
import { ParallelRunner } from './ParallelRunner.js'; import { ParallelRunner } from './ParallelRunner.js';
import { ArpeggioRunner } from './ArpeggioRunner.js'; import { ArpeggioRunner } from './ArpeggioRunner.js';
import { TeamLeaderRunner } from './TeamLeaderRunner.js';
import { buildRunPaths, type RunPaths } from '../run/run-paths.js'; import { buildRunPaths, type RunPaths } from '../run/run-paths.js';
const log = createLogger('engine'); const log = createLogger('engine');
@ -63,6 +64,7 @@ export class PieceEngine extends EventEmitter {
private readonly movementExecutor: MovementExecutor; private readonly movementExecutor: MovementExecutor;
private readonly parallelRunner: ParallelRunner; private readonly parallelRunner: ParallelRunner;
private readonly arpeggioRunner: ArpeggioRunner; private readonly arpeggioRunner: ArpeggioRunner;
private readonly teamLeaderRunner: TeamLeaderRunner;
private readonly detectRuleIndex: (content: string, movementName: string) => number; private readonly detectRuleIndex: (content: string, movementName: string) => number;
private readonly callAiJudge: ( private readonly callAiJudge: (
agentOutput: string, agentOutput: string,
@ -163,6 +165,22 @@ export class PieceEngine extends EventEmitter {
}, },
}); });
this.teamLeaderRunner = new TeamLeaderRunner({
optionsBuilder: this.optionsBuilder,
movementExecutor: this.movementExecutor,
engineOptions: this.options,
getCwd: () => this.cwd,
getInteractive: () => this.options.interactive === true,
detectRuleIndex: this.detectRuleIndex,
callAiJudge: this.callAiJudge,
onPhaseStart: (step, phase, phaseName, instruction) => {
this.emit('phase:start', step, phase, phaseName, instruction);
},
onPhaseComplete: (step, phase, phaseName, content, phaseStatus, error) => {
this.emit('phase:complete', step, phase, phaseName, content, phaseStatus, error);
},
});
log.debug('PieceEngine initialized', { log.debug('PieceEngine initialized', {
piece: config.name, piece: config.name,
movements: config.movements.map(s => s.name), movements: config.movements.map(s => s.name),
@ -337,6 +355,10 @@ export class PieceEngine extends EventEmitter {
result = await this.arpeggioRunner.runArpeggioMovement( result = await this.arpeggioRunner.runArpeggioMovement(
step, this.state, step, this.state,
); );
} else if (step.teamLeader) {
result = await this.teamLeaderRunner.runTeamLeaderMovement(
step, this.state, this.task, this.config.maxMovements, updateSession,
);
} else { } else {
result = await this.movementExecutor.runNormalMovement( result = await this.movementExecutor.runNormalMovement(
step, this.state, this.task, this.config.maxMovements, updateSession, prebuiltInstruction, step, this.state, this.task, this.config.maxMovements, updateSession, prebuiltInstruction,
@ -531,8 +553,8 @@ export class PieceEngine extends EventEmitter {
this.state.iteration++; this.state.iteration++;
// Build instruction before emitting movement:start so listeners can log it. // Build instruction before emitting movement:start so listeners can log it.
// Parallel and arpeggio movements handle iteration incrementing internally. // Parallel/arpeggio/team_leader movements handle iteration incrementing internally.
const isDelegated = (movement.parallel && movement.parallel.length > 0) || !!movement.arpeggio; const isDelegated = (movement.parallel && movement.parallel.length > 0) || !!movement.arpeggio || !!movement.teamLeader;
let prebuiltInstruction: string | undefined; let prebuiltInstruction: string | undefined;
if (!isDelegated) { if (!isDelegated) {
const movementIteration = incrementMovementIteration(this.state, movement.name); const movementIteration = incrementMovementIteration(this.state, movement.name);
@ -562,7 +584,7 @@ export class PieceEngine extends EventEmitter {
} }
if (response.status === 'error') { if (response.status === 'error') {
const detail = response.error ?? response.content ?? `Movement "${movement.name}" returned error status`; const detail = response.error ?? response.content;
this.state.status = 'aborted'; this.state.status = 'aborted';
this.emit('piece:abort', this.state, `Movement "${movement.name}" failed: ${detail}`); this.emit('piece:abort', this.state, `Movement "${movement.name}" failed: ${detail}`);
break; break;

View File

@ -0,0 +1,282 @@
import { runAgent } from '../../../agents/runner.js';
import type {
PieceMovement,
PieceState,
AgentResponse,
PartDefinition,
PartResult,
} from '../../models/types.js';
import { detectMatchedRule } from '../evaluation/index.js';
import { buildSessionKey } from '../session-key.js';
import { ParallelLogger } from './parallel-logger.js';
import { incrementMovementIteration } from './state-manager.js';
import { parseParts } from './task-decomposer.js';
import { buildAbortSignal } from './abort-signal.js';
import { createLogger, getErrorMessage } from '../../../shared/utils/index.js';
import type { OptionsBuilder } from './OptionsBuilder.js';
import type { MovementExecutor } from './MovementExecutor.js';
import type { PieceEngineOptions, PhaseName } from '../types.js';
import type { ParallelLoggerOptions } from './parallel-logger.js';
const log = createLogger('team-leader-runner');
function resolvePartErrorDetail(partResult: PartResult): string {
const detail = partResult.response.error ?? partResult.response.content;
if (!detail) {
throw new Error(`Part "${partResult.part.id}" failed without error detail`);
}
return detail;
}
export interface TeamLeaderRunnerDeps {
readonly optionsBuilder: OptionsBuilder;
readonly movementExecutor: MovementExecutor;
readonly engineOptions: PieceEngineOptions;
readonly getCwd: () => string;
readonly getInteractive: () => boolean;
readonly detectRuleIndex: (content: string, movementName: string) => number;
readonly callAiJudge: (
agentOutput: string,
conditions: Array<{ index: number; text: string }>,
options: { cwd: string }
) => Promise<number>;
readonly onPhaseStart?: (step: PieceMovement, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void;
readonly onPhaseComplete?: (step: PieceMovement, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void;
}
function createPartMovement(step: PieceMovement, part: PartDefinition): PieceMovement {
if (!step.teamLeader) {
throw new Error(`Movement "${step.name}" has no teamLeader configuration`);
}
return {
name: `${step.name}.${part.id}`,
description: part.title,
persona: step.teamLeader.partPersona ?? step.persona,
personaPath: step.teamLeader.partPersonaPath ?? step.personaPath,
personaDisplayName: `${step.name}:${part.id}`,
session: 'refresh',
allowedTools: step.teamLeader.partAllowedTools ?? step.allowedTools,
mcpServers: step.mcpServers,
provider: step.provider,
model: step.model,
permissionMode: step.teamLeader.partPermissionMode ?? step.permissionMode,
edit: step.teamLeader.partEdit ?? step.edit,
instructionTemplate: part.instruction,
passPreviousResponse: false,
};
}
export class TeamLeaderRunner {
constructor(
private readonly deps: TeamLeaderRunnerDeps,
) {}
async runTeamLeaderMovement(
step: PieceMovement,
state: PieceState,
task: string,
maxMovements: number,
updatePersonaSession: (persona: string, sessionId: string | undefined) => void,
): Promise<{ response: AgentResponse; instruction: string }> {
if (!step.teamLeader) {
throw new Error(`Movement "${step.name}" has no teamLeader configuration`);
}
const teamLeaderConfig = step.teamLeader;
const movementIteration = incrementMovementIteration(state, step.name);
const leaderStep: PieceMovement = {
...step,
persona: teamLeaderConfig.persona ?? step.persona,
personaPath: teamLeaderConfig.personaPath ?? step.personaPath,
};
const instruction = this.deps.movementExecutor.buildInstruction(
leaderStep,
movementIteration,
state,
task,
maxMovements,
);
this.deps.onPhaseStart?.(leaderStep, 1, 'execute', instruction);
const leaderResponse = await runAgent(
leaderStep.persona,
instruction,
this.deps.optionsBuilder.buildAgentOptions(leaderStep),
);
updatePersonaSession(buildSessionKey(leaderStep), leaderResponse.sessionId);
this.deps.onPhaseComplete?.(
leaderStep,
1,
'execute',
leaderResponse.content,
leaderResponse.status,
leaderResponse.error,
);
if (leaderResponse.status === 'error') {
const detail = leaderResponse.error ?? leaderResponse.content;
throw new Error(`Team leader failed: ${detail}`);
}
const parts = parseParts(leaderResponse.content, teamLeaderConfig.maxParts);
log.debug('Team leader decomposed parts', {
movement: step.name,
partCount: parts.length,
partIds: parts.map((part) => part.id),
});
const parallelLogger = this.deps.engineOptions.onStream
? new ParallelLogger(this.buildParallelLoggerOptions(
step.name,
movementIteration,
parts.map((part) => part.id),
state.iteration,
maxMovements,
))
: undefined;
const settled = await Promise.allSettled(
parts.map((part, index) => this.runSinglePart(
step,
part,
index,
teamLeaderConfig.timeoutMs,
updatePersonaSession,
parallelLogger,
)),
);
const partResults: PartResult[] = settled.map((result, index) => {
const part = parts[index];
if (!part) {
throw new Error(`Missing part at index ${index}`);
}
if (result.status === 'fulfilled') {
state.movementOutputs.set(result.value.response.persona, result.value.response);
return result.value;
}
const errorMsg = getErrorMessage(result.reason);
const errorResponse: AgentResponse = {
persona: `${step.name}.${part.id}`,
status: 'error',
content: '',
timestamp: new Date(),
error: errorMsg,
};
state.movementOutputs.set(errorResponse.persona, errorResponse);
return { part, response: errorResponse };
});
const allFailed = partResults.every((result) => result.response.status === 'error');
if (allFailed) {
const errors = partResults.map((result) => `${result.part.id}: ${resolvePartErrorDetail(result)}`).join('; ');
throw new Error(`All team leader parts failed: ${errors}`);
}
if (parallelLogger) {
parallelLogger.printSummary(
step.name,
partResults.map((result) => ({ name: result.part.id, condition: undefined })),
);
}
const aggregatedContent = [
'## decomposition',
leaderResponse.content,
...partResults.map((result) => [
`## ${result.part.id}: ${result.part.title}`,
result.response.status === 'error'
? `[ERROR] ${resolvePartErrorDetail(result)}`
: result.response.content,
].join('\n')),
].join('\n\n---\n\n');
const ruleCtx = {
state,
cwd: this.deps.getCwd(),
interactive: this.deps.getInteractive(),
detectRuleIndex: this.deps.detectRuleIndex,
callAiJudge: this.deps.callAiJudge,
};
const match = await detectMatchedRule(step, aggregatedContent, '', ruleCtx);
const aggregatedResponse: AgentResponse = {
persona: step.name,
status: 'done',
content: aggregatedContent,
timestamp: new Date(),
...(match && { matchedRuleIndex: match.index, matchedRuleMethod: match.method }),
};
state.movementOutputs.set(step.name, aggregatedResponse);
state.lastOutput = aggregatedResponse;
this.deps.movementExecutor.persistPreviousResponseSnapshot(
state,
step.name,
movementIteration,
aggregatedResponse.content,
);
this.deps.movementExecutor.emitMovementReports(step);
return { response: aggregatedResponse, instruction };
}
private async runSinglePart(
step: PieceMovement,
part: PartDefinition,
partIndex: number,
defaultTimeoutMs: number,
updatePersonaSession: (persona: string, sessionId: string | undefined) => void,
parallelLogger: ParallelLogger | undefined,
): Promise<PartResult> {
const partMovement = createPartMovement(step, part);
const baseOptions = this.deps.optionsBuilder.buildAgentOptions(partMovement);
const timeoutMs = part.timeoutMs ?? defaultTimeoutMs;
const { signal, dispose } = buildAbortSignal(timeoutMs, baseOptions.abortSignal);
const options = parallelLogger
? { ...baseOptions, abortSignal: signal, onStream: parallelLogger.createStreamHandler(part.id, partIndex) }
: { ...baseOptions, abortSignal: signal };
try {
const response = await runAgent(partMovement.persona, part.instruction, options);
updatePersonaSession(buildSessionKey(partMovement), response.sessionId);
return {
part,
response: {
...response,
persona: partMovement.name,
},
};
} finally {
dispose();
}
}
private buildParallelLoggerOptions(
movementName: string,
movementIteration: number,
subMovementNames: string[],
iteration: number,
maxMovements: number,
): ParallelLoggerOptions {
const options: ParallelLoggerOptions = {
subMovementNames,
parentOnStream: this.deps.engineOptions.onStream,
progressInfo: { iteration, maxMovements },
};
if (this.deps.engineOptions.taskPrefix != null && this.deps.engineOptions.taskColorIndex != null) {
return {
...options,
taskLabel: this.deps.engineOptions.taskPrefix,
taskColorIndex: this.deps.engineOptions.taskColorIndex,
parentMovementName: movementName,
movementIteration,
};
}
return options;
}
}

View File

@ -0,0 +1,29 @@
export function buildAbortSignal(
timeoutMs: number,
parentSignal: AbortSignal | undefined,
): { signal: AbortSignal; dispose: () => void } {
const timeoutController = new AbortController();
const timeoutId = setTimeout(() => {
timeoutController.abort(new Error(`Part timeout after ${timeoutMs}ms`));
}, timeoutMs);
let abortListener: (() => void) | undefined;
if (parentSignal) {
abortListener = () => timeoutController.abort(parentSignal.reason);
if (parentSignal.aborted) {
abortListener();
} else {
parentSignal.addEventListener('abort', abortListener, { once: true });
}
}
return {
signal: timeoutController.signal,
dispose: () => {
clearTimeout(timeoutId);
if (parentSignal && abortListener) {
parentSignal.removeEventListener('abort', abortListener);
}
},
};
}

View File

@ -9,6 +9,7 @@ export { MovementExecutor } from './MovementExecutor.js';
export type { MovementExecutorDeps } from './MovementExecutor.js'; export type { MovementExecutorDeps } from './MovementExecutor.js';
export { ParallelRunner } from './ParallelRunner.js'; export { ParallelRunner } from './ParallelRunner.js';
export { ArpeggioRunner } from './ArpeggioRunner.js'; export { ArpeggioRunner } from './ArpeggioRunner.js';
export { TeamLeaderRunner } from './TeamLeaderRunner.js';
export { OptionsBuilder } from './OptionsBuilder.js'; export { OptionsBuilder } from './OptionsBuilder.js';
export { CycleDetector } from './cycle-detector.js'; export { CycleDetector } from './cycle-detector.js';
export type { CycleCheckResult } from './cycle-detector.js'; export type { CycleCheckResult } from './cycle-detector.js';

View File

@ -0,0 +1,79 @@
import type { PartDefinition } from '../../models/part.js';
const JSON_CODE_BLOCK_REGEX = /```json\s*([\s\S]*?)```/g;
function parseJsonBlock(content: string): unknown {
let lastJsonBlock: string | undefined;
let match: RegExpExecArray | null;
while ((match = JSON_CODE_BLOCK_REGEX.exec(content)) !== null) {
if (match[1]) {
lastJsonBlock = match[1].trim();
}
}
if (!lastJsonBlock) {
throw new Error('Team leader output must include a ```json ... ``` block');
}
try {
return JSON.parse(lastJsonBlock) as unknown;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
throw new Error(`Failed to parse part JSON: ${message}`);
}
}
function assertString(value: unknown, fieldName: string, index: number): string {
if (typeof value !== 'string' || value.trim().length === 0) {
throw new Error(`Part[${index}] "${fieldName}" must be a non-empty string`);
}
return value;
}
function parsePartEntry(entry: unknown, index: number): PartDefinition {
if (typeof entry !== 'object' || entry == null || Array.isArray(entry)) {
throw new Error(`Part[${index}] must be an object`);
}
const raw = entry as Record<string, unknown>;
const id = assertString(raw.id, 'id', index);
const title = assertString(raw.title, 'title', index);
const instruction = assertString(raw.instruction, 'instruction', index);
const timeoutMs = raw.timeout_ms;
if (timeoutMs != null && (typeof timeoutMs !== 'number' || !Number.isInteger(timeoutMs) || timeoutMs <= 0)) {
throw new Error(`Part[${index}] "timeout_ms" must be a positive integer`);
}
return {
id,
title,
instruction,
timeoutMs: timeoutMs as number | undefined,
};
}
export function parseParts(content: string, maxParts: number): PartDefinition[] {
const parsed = parseJsonBlock(content);
if (!Array.isArray(parsed)) {
throw new Error('Team leader JSON must be an array');
}
if (parsed.length === 0) {
throw new Error('Team leader JSON must contain at least one part');
}
if (parsed.length > maxParts) {
throw new Error(`Team leader produced too many parts: ${parsed.length} > ${maxParts}`);
}
const parts = parsed.map((entry, index) => parsePartEntry(entry, index));
const ids = new Set<string>();
for (const part of parts) {
if (ids.has(part.id)) {
throw new Error(`Duplicate part id: ${part.id}`);
}
ids.add(part.id);
}
return parts;
}

View File

@ -10,7 +10,7 @@ import { dirname, resolve } from 'node:path';
import { parse as parseYaml } from 'yaml'; import { parse as parseYaml } from 'yaml';
import type { z } from 'zod'; import type { z } from 'zod';
import { PieceConfigRawSchema, PieceMovementRawSchema } from '../../../core/models/index.js'; import { PieceConfigRawSchema, PieceMovementRawSchema } from '../../../core/models/index.js';
import type { PieceConfig, PieceMovement, PieceRule, OutputContractEntry, OutputContractLabelPath, OutputContractItem, LoopMonitorConfig, LoopMonitorJudge, ArpeggioMovementConfig, ArpeggioMergeMovementConfig } from '../../../core/models/index.js'; import type { PieceConfig, PieceMovement, PieceRule, OutputContractEntry, OutputContractLabelPath, OutputContractItem, LoopMonitorConfig, LoopMonitorJudge, ArpeggioMovementConfig, ArpeggioMergeMovementConfig, TeamLeaderConfig } from '../../../core/models/index.js';
import { getLanguage } from '../global/globalConfig.js'; import { getLanguage } from '../global/globalConfig.js';
import { import {
type PieceSections, type PieceSections,
@ -179,6 +179,31 @@ function normalizeArpeggio(
}; };
} }
/** Normalize raw team_leader config from YAML into internal format. */
function normalizeTeamLeader(
raw: RawStep['team_leader'],
pieceDir: string,
sections: PieceSections,
context?: FacetResolutionContext,
): TeamLeaderConfig | undefined {
if (!raw) return undefined;
const { personaSpec, personaPath } = resolvePersona(raw.persona, sections, pieceDir, context);
const { personaSpec: partPersona, personaPath: partPersonaPath } = resolvePersona(raw.part_persona, sections, pieceDir, context);
return {
persona: personaSpec,
personaPath,
maxParts: raw.max_parts,
timeoutMs: raw.timeout_ms,
partPersona,
partPersonaPath,
partAllowedTools: raw.part_allowed_tools,
partEdit: raw.part_edit,
partPermissionMode: raw.part_permission_mode,
};
}
/** Normalize a raw step into internal PieceMovement format. */ /** Normalize a raw step into internal PieceMovement format. */
function normalizeStepFromRaw( function normalizeStepFromRaw(
step: RawStep, step: RawStep,
@ -237,6 +262,11 @@ function normalizeStepFromRaw(
result.arpeggio = arpeggioConfig; result.arpeggio = arpeggioConfig;
} }
const teamLeaderConfig = normalizeTeamLeader(step.team_leader, pieceDir, sections, context);
if (teamLeaderConfig) {
result.teamLeader = teamLeaderConfig;
}
return result; return result;
} }