diff --git a/src/__tests__/abort-signal.test.ts b/src/__tests__/abort-signal.test.ts new file mode 100644 index 0000000..c3b80b2 --- /dev/null +++ b/src/__tests__/abort-signal.test.ts @@ -0,0 +1,68 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { buildAbortSignal } from '../core/piece/engine/abort-signal.js'; + +describe('buildAbortSignal', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it('タイムアウトでabortされる', () => { + const { signal, dispose } = buildAbortSignal(100, undefined); + + expect(signal.aborted).toBe(false); + vi.advanceTimersByTime(100); + expect(signal.aborted).toBe(true); + expect(signal.reason).toBeInstanceOf(Error); + expect((signal.reason as Error).message).toBe('Part timeout after 100ms'); + + dispose(); + }); + + it('親シグナルがabortされると子シグナルへ伝搬する', () => { + const parent = new AbortController(); + const { signal, dispose } = buildAbortSignal(1000, parent.signal); + const reason = new Error('parent aborted'); + + parent.abort(reason); + + expect(signal.aborted).toBe(true); + expect(signal.reason).toBe(reason); + + dispose(); + }); + + it('disposeでタイマーと親リスナーを解放する', () => { + const parent = new AbortController(); + const addSpy = vi.spyOn(parent.signal, 'addEventListener'); + const removeSpy = vi.spyOn(parent.signal, 'removeEventListener'); + const { signal, dispose } = buildAbortSignal(100, parent.signal); + + expect(addSpy).toHaveBeenCalledTimes(1); + + dispose(); + vi.advanceTimersByTime(200); + + expect(signal.aborted).toBe(false); + expect(removeSpy).toHaveBeenCalledTimes(1); + }); + + it('親シグナルが既にabort済みなら即時伝搬する', () => { + const parent = new AbortController(); + const reason = new Error('already aborted'); + const addSpy = vi.spyOn(parent.signal, 'addEventListener'); + parent.abort(reason); + + const { signal, dispose } = buildAbortSignal(1000, parent.signal); + + expect(signal.aborted).toBe(true); + expect(signal.reason).toBe(reason); + expect(addSpy).not.toHaveBeenCalled(); + + dispose(); + }); +}); diff --git a/src/__tests__/engine-team-leader.test.ts b/src/__tests__/engine-team-leader.test.ts new file mode 100644 index 0000000..cd35a8d --- /dev/null +++ b/src/__tests__/engine-team-leader.test.ts @@ -0,0 +1,172 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { existsSync, rmSync } from 'node:fs'; +import { runAgent } from '../agents/runner.js'; +import { detectMatchedRule } from '../core/piece/evaluation/index.js'; +import { PieceEngine } from '../core/piece/engine/PieceEngine.js'; +import { makeMovement, makeRule, makeResponse, createTestTmpDir, applyDefaultMocks } from './engine-test-helpers.js'; +import type { PieceConfig } from '../core/models/index.js'; + +vi.mock('../agents/runner.js', () => ({ + runAgent: vi.fn(), +})); + +vi.mock('../core/piece/evaluation/index.js', () => ({ + detectMatchedRule: vi.fn(), +})); + +vi.mock('../core/piece/phase-runner.js', () => ({ + needsStatusJudgmentPhase: vi.fn().mockReturnValue(false), + runReportPhase: vi.fn().mockResolvedValue(undefined), + runStatusJudgmentPhase: vi.fn().mockResolvedValue(''), +})); + +vi.mock('../shared/utils/index.js', async (importOriginal) => ({ + ...(await importOriginal>()), + generateReportDir: vi.fn().mockReturnValue('test-report-dir'), +})); + +function buildTeamLeaderConfig(): PieceConfig { + return { + name: 'team-leader-piece', + initialMovement: 'implement', + maxMovements: 5, + movements: [ + makeMovement('implement', { + instructionTemplate: 'Task: {task}', + teamLeader: { + persona: '../personas/team-leader.md', + maxParts: 3, + timeoutMs: 10000, + partPersona: '../personas/coder.md', + partAllowedTools: ['Read', 'Edit', 'Write'], + partEdit: true, + partPermissionMode: 'edit', + }, + rules: [makeRule('done', 'COMPLETE')], + }), + ], + }; +} + +describe('PieceEngine Integration: TeamLeaderRunner', () => { + let tmpDir: string; + + beforeEach(() => { + vi.resetAllMocks(); + applyDefaultMocks(); + tmpDir = createTestTmpDir(); + }); + + afterEach(() => { + if (existsSync(tmpDir)) { + rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it('team leaderが分解したパートを並列実行し集約する', async () => { + const config = buildTeamLeaderConfig(); + const engine = new PieceEngine(config, tmpDir, 'implement feature', { projectCwd: tmpDir }); + + vi.mocked(runAgent) + .mockResolvedValueOnce(makeResponse({ + persona: 'team-leader', + content: [ + '```json', + '[{"id":"part-1","title":"API","instruction":"Implement API"},{"id":"part-2","title":"Test","instruction":"Add tests"}]', + '```', + ].join('\n'), + })) + .mockResolvedValueOnce(makeResponse({ persona: 'coder', content: 'API done' })) + .mockResolvedValueOnce(makeResponse({ persona: 'coder', content: 'Tests done' })); + + vi.mocked(detectMatchedRule).mockResolvedValueOnce({ index: 0, method: 'phase1_tag' }); + + const state = await engine.run(); + + expect(state.status).toBe('completed'); + expect(vi.mocked(runAgent)).toHaveBeenCalledTimes(3); + const output = state.movementOutputs.get('implement'); + expect(output).toBeDefined(); + expect(output!.content).toContain('## decomposition'); + expect(output!.content).toContain('## part-1: API'); + expect(output!.content).toContain('API done'); + expect(output!.content).toContain('## part-2: Test'); + expect(output!.content).toContain('Tests done'); + }); + + it('全パートが失敗した場合はムーブメント失敗として中断する', async () => { + const config = buildTeamLeaderConfig(); + const engine = new PieceEngine(config, tmpDir, 'implement feature', { projectCwd: tmpDir }); + + vi.mocked(runAgent) + .mockResolvedValueOnce(makeResponse({ + persona: 'team-leader', + content: [ + '```json', + '[{"id":"part-1","title":"API","instruction":"Implement API"},{"id":"part-2","title":"Test","instruction":"Add tests"}]', + '```', + ].join('\n'), + })) + .mockResolvedValueOnce(makeResponse({ persona: 'coder', status: 'error', error: 'api failed' })) + .mockResolvedValueOnce(makeResponse({ persona: 'coder', status: 'error', error: 'test failed' })); + + const state = await engine.run(); + + expect(state.status).toBe('aborted'); + }); + + it('一部パートが失敗しても成功パートがあれば集約結果は完了する', async () => { + const config = buildTeamLeaderConfig(); + const engine = new PieceEngine(config, tmpDir, 'implement feature', { projectCwd: tmpDir }); + + vi.mocked(runAgent) + .mockResolvedValueOnce(makeResponse({ + persona: 'team-leader', + content: [ + '```json', + '[{"id":"part-1","title":"API","instruction":"Implement API"},{"id":"part-2","title":"Test","instruction":"Add tests"}]', + '```', + ].join('\n'), + })) + .mockResolvedValueOnce(makeResponse({ persona: 'coder', content: 'API done' })) + .mockResolvedValueOnce(makeResponse({ persona: 'coder', status: 'error', error: 'test failed' })); + + vi.mocked(detectMatchedRule).mockResolvedValueOnce({ index: 0, method: 'phase1_tag' }); + + const state = await engine.run(); + + expect(state.status).toBe('completed'); + const output = state.movementOutputs.get('implement'); + expect(output).toBeDefined(); + expect(output!.content).toContain('## part-1: API'); + expect(output!.content).toContain('API done'); + expect(output!.content).toContain('## part-2: Test'); + expect(output!.content).toContain('[ERROR] test failed'); + }); + + it('パート失敗時にerrorがなくてもcontentの詳細をエラー表示に使う', async () => { + const config = buildTeamLeaderConfig(); + const engine = new PieceEngine(config, tmpDir, 'implement feature', { projectCwd: tmpDir }); + + vi.mocked(runAgent) + .mockResolvedValueOnce(makeResponse({ + persona: 'team-leader', + content: [ + '```json', + '[{"id":"part-1","title":"API","instruction":"Implement API"},{"id":"part-2","title":"Test","instruction":"Add tests"}]', + '```', + ].join('\n'), + })) + .mockResolvedValueOnce(makeResponse({ persona: 'coder', status: 'error', content: 'api failed from content' })) + .mockResolvedValueOnce(makeResponse({ persona: 'coder', content: 'Tests done' })); + + vi.mocked(detectMatchedRule).mockResolvedValueOnce({ index: 0, method: 'phase1_tag' }); + + const state = await engine.run(); + + expect(state.status).toBe('completed'); + const output = state.movementOutputs.get('implement'); + expect(output).toBeDefined(); + expect(output!.content).toContain('[ERROR] api failed from content'); + }); +}); diff --git a/src/__tests__/task-decomposer.test.ts b/src/__tests__/task-decomposer.test.ts new file mode 100644 index 0000000..d374072 --- /dev/null +++ b/src/__tests__/task-decomposer.test.ts @@ -0,0 +1,76 @@ +import { describe, it, expect } from 'vitest'; +import { parseParts } from '../core/piece/engine/task-decomposer.js'; + +describe('parseParts', () => { + it('最後のjsonコードブロックをパースする', () => { + const content = [ + '説明', + '```json', + '[{"id":"old","title":"old","instruction":"old"}]', + '```', + '最終案', + '```json', + '[{"id":"a","title":"A","instruction":"Do A"},{"id":"b","title":"B","instruction":"Do B","timeout_ms":1200}]', + '```', + ].join('\n'); + + const result = parseParts(content, 3); + + expect(result).toHaveLength(2); + expect(result[0]).toEqual({ + id: 'a', + title: 'A', + instruction: 'Do A', + timeoutMs: undefined, + }); + expect(result[1]!.timeoutMs).toBe(1200); + }); + + it('jsonコードブロックがない場合はエラー', () => { + expect(() => parseParts('no json', 3)).toThrow( + 'Team leader output must include a ```json ... ``` block', + ); + }); + + it('max_partsを超えたらエラー', () => { + const content = '```json\n[{"id":"a","title":"A","instruction":"Do A"},{"id":"b","title":"B","instruction":"Do B"}]\n```'; + + expect(() => parseParts(content, 1)).toThrow( + 'Team leader produced too many parts: 2 > 1', + ); + }); + + it('必須フィールドが不足したらエラー', () => { + const content = '```json\n[{"id":"a","title":"A"}]\n```'; + + expect(() => parseParts(content, 3)).toThrow( + 'Part[0] "instruction" must be a non-empty string', + ); + }); + + it('jsonコードブロックが配列でない場合はエラー', () => { + const content = '```json\n{"not":"array"}\n```'; + + expect(() => parseParts(content, 3)).toThrow( + 'Team leader JSON must be an array', + ); + }); + + it('空配列の場合はエラー', () => { + const content = '```json\n[]\n```'; + + expect(() => parseParts(content, 3)).toThrow( + 'Team leader JSON must contain at least one part', + ); + }); + + it('重複したpart idがある場合はエラー', () => { + const content = [ + '```json', + '[{"id":"dup","title":"A","instruction":"Do A"},{"id":"dup","title":"B","instruction":"Do B"}]', + '```', + ].join('\n'); + + expect(() => parseParts(content, 3)).toThrow('Duplicate part id: dup'); + }); +}); diff --git a/src/__tests__/team-leader-schema-loader.test.ts b/src/__tests__/team-leader-schema-loader.test.ts new file mode 100644 index 0000000..df92632 --- /dev/null +++ b/src/__tests__/team-leader-schema-loader.test.ts @@ -0,0 +1,105 @@ +import { describe, it, expect } from 'vitest'; +import { join } from 'node:path'; +import { PieceMovementRawSchema } from '../core/models/schemas.js'; +import { normalizePieceConfig } from '../infra/config/loaders/pieceParser.js'; + +describe('team_leader schema', () => { + it('max_parts <= 3 の設定を受け付ける', () => { + const raw = { + name: 'implement', + team_leader: { + persona: 'team-leader', + max_parts: 3, + timeout_ms: 120000, + }, + instruction_template: 'decompose', + }; + + const result = PieceMovementRawSchema.safeParse(raw); + expect(result.success).toBe(true); + }); + + it('max_parts > 3 は拒否する', () => { + const raw = { + name: 'implement', + team_leader: { + max_parts: 4, + }, + instruction_template: 'decompose', + }; + + const result = PieceMovementRawSchema.safeParse(raw); + expect(result.success).toBe(false); + }); + + it('parallel と team_leader の同時指定は拒否する', () => { + const raw = { + name: 'implement', + parallel: [{ name: 'sub', instruction_template: 'x' }], + team_leader: { + max_parts: 2, + }, + instruction_template: 'decompose', + }; + + const result = PieceMovementRawSchema.safeParse(raw); + expect(result.success).toBe(false); + }); + + it('arpeggio と team_leader の同時指定は拒否する', () => { + const raw = { + name: 'implement', + arpeggio: { + source: 'csv', + source_path: './data.csv', + template: './prompt.md', + }, + team_leader: { + max_parts: 2, + }, + instruction_template: 'decompose', + }; + + const result = PieceMovementRawSchema.safeParse(raw); + expect(result.success).toBe(false); + }); +}); + +describe('normalizePieceConfig team_leader', () => { + it('team_leader を内部形式へ正規化する', () => { + const pieceDir = join(process.cwd(), 'src', '__tests__'); + const raw = { + name: 'piece', + movements: [ + { + name: 'implement', + team_leader: { + persona: 'team-leader', + max_parts: 2, + timeout_ms: 90000, + part_persona: 'coder', + part_allowed_tools: ['Read', 'Edit'], + part_edit: true, + part_permission_mode: 'edit', + }, + instruction_template: 'decompose', + }, + ], + }; + + const config = normalizePieceConfig(raw, pieceDir); + const movement = config.movements[0]; + expect(movement).toBeDefined(); + expect(movement!.teamLeader).toEqual({ + persona: 'team-leader', + personaPath: undefined, + maxParts: 2, + timeoutMs: 90000, + partPersona: 'coder', + partPersonaPath: undefined, + partAllowedTools: ['Read', 'Edit'], + partEdit: true, + partPermissionMode: 'edit', + }); + }); +}); diff --git a/src/core/models/index.ts b/src/core/models/index.ts index 8221700..9177e1b 100644 --- a/src/core/models/index.ts +++ b/src/core/models/index.ts @@ -10,6 +10,9 @@ export type { McpServerConfig, AgentResponse, SessionState, + PartDefinition, + PartResult, + TeamLeaderConfig, PieceRule, PieceMovement, ArpeggioMovementConfig, diff --git a/src/core/models/part.ts b/src/core/models/part.ts new file mode 100644 index 0000000..c04e8a5 --- /dev/null +++ b/src/core/models/part.ts @@ -0,0 +1,42 @@ +import type { PermissionMode } from './status.js'; +import type { AgentResponse } from './response.js'; + +/** Part definition produced by movement team leader agent */ +export interface PartDefinition { + /** Unique ID inside the parent movement */ + id: string; + /** Human-readable title */ + title: string; + /** Instruction passed to the part agent */ + instruction: string; + /** Optional per-part timeout in milliseconds */ + timeoutMs?: number; +} + +/** Result of a single part execution */ +export interface PartResult { + part: PartDefinition; + response: AgentResponse; +} + +/** team_leader config on a movement */ +export interface TeamLeaderConfig { + /** Persona reference for the team leader agent */ + persona?: string; + /** Resolved absolute path for team leader persona */ + personaPath?: string; + /** Maximum number of parts to run in parallel */ + maxParts: number; + /** Default timeout for parts in milliseconds */ + timeoutMs: number; + /** Persona reference for part agents */ + partPersona?: string; + /** Resolved absolute path for part persona */ + partPersonaPath?: string; + /** Allowed tools for part agents */ + partAllowedTools?: string[]; + /** Whether part agents can edit files */ + partEdit?: boolean; + /** Permission mode for part agents */ + partPermissionMode?: PermissionMode; +} diff --git a/src/core/models/piece-types.ts b/src/core/models/piece-types.ts index 7c029cc..49deeec 100644 --- a/src/core/models/piece-types.ts +++ b/src/core/models/piece-types.ts @@ -5,6 +5,7 @@ import type { PermissionMode } from './status.js'; import type { AgentResponse } from './response.js'; import type { InteractiveMode } from './interactive-mode.js'; +import type { TeamLeaderConfig } from './part.js'; /** Rule-based transition configuration (unified format) */ export interface PieceRule { @@ -116,6 +117,8 @@ export interface PieceMovement { parallel?: PieceMovement[]; /** Arpeggio configuration for data-driven batch processing. When set, this movement reads from a data source, expands templates, and calls LLM per batch. */ arpeggio?: ArpeggioMovementConfig; + /** Team leader configuration for dynamic part decomposition + parallel execution */ + teamLeader?: TeamLeaderConfig; /** Resolved policy content strings (from piece-level policies map, resolved at parse time) */ policyContents?: string[]; /** Resolved knowledge content strings (from piece-level knowledge map, resolved at parse time) */ diff --git a/src/core/models/schemas.ts b/src/core/models/schemas.ts index 6ab743d..0c0f812 100644 --- a/src/core/models/schemas.ts +++ b/src/core/models/schemas.ts @@ -170,6 +170,24 @@ export const ArpeggioConfigRawSchema = z.object({ output_path: z.string().optional(), }); +/** Team leader configuration schema for dynamic part decomposition */ +export const TeamLeaderConfigRawSchema = z.object({ + /** Persona reference for team leader agent */ + persona: z.string().optional(), + /** Maximum number of parts (must be <= 3) */ + max_parts: z.number().int().positive().max(3).optional().default(3), + /** Default timeout per part in milliseconds */ + timeout_ms: z.number().int().positive().optional().default(600000), + /** Persona reference for part agents */ + part_persona: z.string().optional(), + /** Allowed tools for part agents */ + part_allowed_tools: z.array(z.string()).optional(), + /** Whether part agents can edit files */ + part_edit: z.boolean().optional(), + /** Permission mode for part agents */ + part_permission_mode: PermissionModeSchema.optional(), +}); + /** Sub-movement schema for parallel execution */ export const ParallelSubMovementRawSchema = z.object({ name: z.string().min(1), @@ -232,7 +250,15 @@ export const PieceMovementRawSchema = z.object({ parallel: z.array(ParallelSubMovementRawSchema).optional(), /** Arpeggio configuration for data-driven batch processing */ arpeggio: ArpeggioConfigRawSchema.optional(), -}); + /** Team leader configuration for dynamic part decomposition */ + team_leader: TeamLeaderConfigRawSchema.optional(), +}).refine( + (data) => [data.parallel, data.arpeggio, data.team_leader].filter((v) => v != null).length <= 1, + { + message: "'parallel', 'arpeggio', and 'team_leader' are mutually exclusive", + path: ['parallel'], + }, +); /** Loop monitor rule schema */ export const LoopMonitorRuleSchema = z.object({ diff --git a/src/core/models/types.ts b/src/core/models/types.ts index 42e49e9..84b8340 100644 --- a/src/core/models/types.ts +++ b/src/core/models/types.ts @@ -23,6 +23,13 @@ export type { SessionState, } from './session.js'; +// Part decomposition +export type { + PartDefinition, + PartResult, + TeamLeaderConfig, +} from './part.js'; + // Piece configuration and runtime state export type { PieceRule, diff --git a/src/core/piece/engine/PieceEngine.ts b/src/core/piece/engine/PieceEngine.ts index cfde239..b65a9de 100644 --- a/src/core/piece/engine/PieceEngine.ts +++ b/src/core/piece/engine/PieceEngine.ts @@ -31,6 +31,7 @@ import { OptionsBuilder } from './OptionsBuilder.js'; import { MovementExecutor } from './MovementExecutor.js'; import { ParallelRunner } from './ParallelRunner.js'; import { ArpeggioRunner } from './ArpeggioRunner.js'; +import { TeamLeaderRunner } from './TeamLeaderRunner.js'; import { buildRunPaths, type RunPaths } from '../run/run-paths.js'; const log = createLogger('engine'); @@ -63,6 +64,7 @@ export class PieceEngine extends EventEmitter { private readonly movementExecutor: MovementExecutor; private readonly parallelRunner: ParallelRunner; private readonly arpeggioRunner: ArpeggioRunner; + private readonly teamLeaderRunner: TeamLeaderRunner; private readonly detectRuleIndex: (content: string, movementName: string) => number; private readonly callAiJudge: ( agentOutput: string, @@ -163,6 +165,22 @@ export class PieceEngine extends EventEmitter { }, }); + this.teamLeaderRunner = new TeamLeaderRunner({ + optionsBuilder: this.optionsBuilder, + movementExecutor: this.movementExecutor, + engineOptions: this.options, + getCwd: () => this.cwd, + getInteractive: () => this.options.interactive === true, + detectRuleIndex: this.detectRuleIndex, + callAiJudge: this.callAiJudge, + onPhaseStart: (step, phase, phaseName, instruction) => { + this.emit('phase:start', step, phase, phaseName, instruction); + }, + onPhaseComplete: (step, phase, phaseName, content, phaseStatus, error) => { + this.emit('phase:complete', step, phase, phaseName, content, phaseStatus, error); + }, + }); + log.debug('PieceEngine initialized', { piece: config.name, movements: config.movements.map(s => s.name), @@ -337,6 +355,10 @@ export class PieceEngine extends EventEmitter { result = await this.arpeggioRunner.runArpeggioMovement( step, this.state, ); + } else if (step.teamLeader) { + result = await this.teamLeaderRunner.runTeamLeaderMovement( + step, this.state, this.task, this.config.maxMovements, updateSession, + ); } else { result = await this.movementExecutor.runNormalMovement( step, this.state, this.task, this.config.maxMovements, updateSession, prebuiltInstruction, @@ -531,8 +553,8 @@ export class PieceEngine extends EventEmitter { this.state.iteration++; // Build instruction before emitting movement:start so listeners can log it. - // Parallel and arpeggio movements handle iteration incrementing internally. - const isDelegated = (movement.parallel && movement.parallel.length > 0) || !!movement.arpeggio; + // Parallel/arpeggio/team_leader movements handle iteration incrementing internally. + const isDelegated = (movement.parallel && movement.parallel.length > 0) || !!movement.arpeggio || !!movement.teamLeader; let prebuiltInstruction: string | undefined; if (!isDelegated) { const movementIteration = incrementMovementIteration(this.state, movement.name); @@ -562,7 +584,7 @@ export class PieceEngine extends EventEmitter { } if (response.status === 'error') { - const detail = response.error ?? response.content ?? `Movement "${movement.name}" returned error status`; + const detail = response.error ?? response.content; this.state.status = 'aborted'; this.emit('piece:abort', this.state, `Movement "${movement.name}" failed: ${detail}`); break; diff --git a/src/core/piece/engine/TeamLeaderRunner.ts b/src/core/piece/engine/TeamLeaderRunner.ts new file mode 100644 index 0000000..d97db90 --- /dev/null +++ b/src/core/piece/engine/TeamLeaderRunner.ts @@ -0,0 +1,282 @@ +import { runAgent } from '../../../agents/runner.js'; +import type { + PieceMovement, + PieceState, + AgentResponse, + PartDefinition, + PartResult, +} from '../../models/types.js'; +import { detectMatchedRule } from '../evaluation/index.js'; +import { buildSessionKey } from '../session-key.js'; +import { ParallelLogger } from './parallel-logger.js'; +import { incrementMovementIteration } from './state-manager.js'; +import { parseParts } from './task-decomposer.js'; +import { buildAbortSignal } from './abort-signal.js'; +import { createLogger, getErrorMessage } from '../../../shared/utils/index.js'; +import type { OptionsBuilder } from './OptionsBuilder.js'; +import type { MovementExecutor } from './MovementExecutor.js'; +import type { PieceEngineOptions, PhaseName } from '../types.js'; +import type { ParallelLoggerOptions } from './parallel-logger.js'; + +const log = createLogger('team-leader-runner'); + +function resolvePartErrorDetail(partResult: PartResult): string { + const detail = partResult.response.error ?? partResult.response.content; + if (!detail) { + throw new Error(`Part "${partResult.part.id}" failed without error detail`); + } + return detail; +} + +export interface TeamLeaderRunnerDeps { + readonly optionsBuilder: OptionsBuilder; + readonly movementExecutor: MovementExecutor; + readonly engineOptions: PieceEngineOptions; + readonly getCwd: () => string; + readonly getInteractive: () => boolean; + readonly detectRuleIndex: (content: string, movementName: string) => number; + readonly callAiJudge: ( + agentOutput: string, + conditions: Array<{ index: number; text: string }>, + options: { cwd: string } + ) => Promise; + readonly onPhaseStart?: (step: PieceMovement, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void; + readonly onPhaseComplete?: (step: PieceMovement, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void; +} + +function createPartMovement(step: PieceMovement, part: PartDefinition): PieceMovement { + if (!step.teamLeader) { + throw new Error(`Movement "${step.name}" has no teamLeader configuration`); + } + + return { + name: `${step.name}.${part.id}`, + description: part.title, + persona: step.teamLeader.partPersona ?? step.persona, + personaPath: step.teamLeader.partPersonaPath ?? step.personaPath, + personaDisplayName: `${step.name}:${part.id}`, + session: 'refresh', + allowedTools: step.teamLeader.partAllowedTools ?? step.allowedTools, + mcpServers: step.mcpServers, + provider: step.provider, + model: step.model, + permissionMode: step.teamLeader.partPermissionMode ?? step.permissionMode, + edit: step.teamLeader.partEdit ?? step.edit, + instructionTemplate: part.instruction, + passPreviousResponse: false, + }; +} + +export class TeamLeaderRunner { + constructor( + private readonly deps: TeamLeaderRunnerDeps, + ) {} + + async runTeamLeaderMovement( + step: PieceMovement, + state: PieceState, + task: string, + maxMovements: number, + updatePersonaSession: (persona: string, sessionId: string | undefined) => void, + ): Promise<{ response: AgentResponse; instruction: string }> { + if (!step.teamLeader) { + throw new Error(`Movement "${step.name}" has no teamLeader configuration`); + } + const teamLeaderConfig = step.teamLeader; + + const movementIteration = incrementMovementIteration(state, step.name); + const leaderStep: PieceMovement = { + ...step, + persona: teamLeaderConfig.persona ?? step.persona, + personaPath: teamLeaderConfig.personaPath ?? step.personaPath, + }; + const instruction = this.deps.movementExecutor.buildInstruction( + leaderStep, + movementIteration, + state, + task, + maxMovements, + ); + + this.deps.onPhaseStart?.(leaderStep, 1, 'execute', instruction); + const leaderResponse = await runAgent( + leaderStep.persona, + instruction, + this.deps.optionsBuilder.buildAgentOptions(leaderStep), + ); + updatePersonaSession(buildSessionKey(leaderStep), leaderResponse.sessionId); + this.deps.onPhaseComplete?.( + leaderStep, + 1, + 'execute', + leaderResponse.content, + leaderResponse.status, + leaderResponse.error, + ); + if (leaderResponse.status === 'error') { + const detail = leaderResponse.error ?? leaderResponse.content; + throw new Error(`Team leader failed: ${detail}`); + } + + const parts = parseParts(leaderResponse.content, teamLeaderConfig.maxParts); + log.debug('Team leader decomposed parts', { + movement: step.name, + partCount: parts.length, + partIds: parts.map((part) => part.id), + }); + + const parallelLogger = this.deps.engineOptions.onStream + ? new ParallelLogger(this.buildParallelLoggerOptions( + step.name, + movementIteration, + parts.map((part) => part.id), + state.iteration, + maxMovements, + )) + : undefined; + + const settled = await Promise.allSettled( + parts.map((part, index) => this.runSinglePart( + step, + part, + index, + teamLeaderConfig.timeoutMs, + updatePersonaSession, + parallelLogger, + )), + ); + + const partResults: PartResult[] = settled.map((result, index) => { + const part = parts[index]; + if (!part) { + throw new Error(`Missing part at index ${index}`); + } + + if (result.status === 'fulfilled') { + state.movementOutputs.set(result.value.response.persona, result.value.response); + return result.value; + } + + const errorMsg = getErrorMessage(result.reason); + const errorResponse: AgentResponse = { + persona: `${step.name}.${part.id}`, + status: 'error', + content: '', + timestamp: new Date(), + error: errorMsg, + }; + state.movementOutputs.set(errorResponse.persona, errorResponse); + return { part, response: errorResponse }; + }); + + const allFailed = partResults.every((result) => result.response.status === 'error'); + if (allFailed) { + const errors = partResults.map((result) => `${result.part.id}: ${resolvePartErrorDetail(result)}`).join('; '); + throw new Error(`All team leader parts failed: ${errors}`); + } + + if (parallelLogger) { + parallelLogger.printSummary( + step.name, + partResults.map((result) => ({ name: result.part.id, condition: undefined })), + ); + } + + const aggregatedContent = [ + '## decomposition', + leaderResponse.content, + ...partResults.map((result) => [ + `## ${result.part.id}: ${result.part.title}`, + result.response.status === 'error' + ? `[ERROR] ${resolvePartErrorDetail(result)}` + : result.response.content, + ].join('\n')), + ].join('\n\n---\n\n'); + + const ruleCtx = { + state, + cwd: this.deps.getCwd(), + interactive: this.deps.getInteractive(), + detectRuleIndex: this.deps.detectRuleIndex, + callAiJudge: this.deps.callAiJudge, + }; + const match = await detectMatchedRule(step, aggregatedContent, '', ruleCtx); + + const aggregatedResponse: AgentResponse = { + persona: step.name, + status: 'done', + content: aggregatedContent, + timestamp: new Date(), + ...(match && { matchedRuleIndex: match.index, matchedRuleMethod: match.method }), + }; + + state.movementOutputs.set(step.name, aggregatedResponse); + state.lastOutput = aggregatedResponse; + this.deps.movementExecutor.persistPreviousResponseSnapshot( + state, + step.name, + movementIteration, + aggregatedResponse.content, + ); + this.deps.movementExecutor.emitMovementReports(step); + + return { response: aggregatedResponse, instruction }; + } + + private async runSinglePart( + step: PieceMovement, + part: PartDefinition, + partIndex: number, + defaultTimeoutMs: number, + updatePersonaSession: (persona: string, sessionId: string | undefined) => void, + parallelLogger: ParallelLogger | undefined, + ): Promise { + const partMovement = createPartMovement(step, part); + const baseOptions = this.deps.optionsBuilder.buildAgentOptions(partMovement); + const timeoutMs = part.timeoutMs ?? defaultTimeoutMs; + const { signal, dispose } = buildAbortSignal(timeoutMs, baseOptions.abortSignal); + const options = parallelLogger + ? { ...baseOptions, abortSignal: signal, onStream: parallelLogger.createStreamHandler(part.id, partIndex) } + : { ...baseOptions, abortSignal: signal }; + + try { + const response = await runAgent(partMovement.persona, part.instruction, options); + updatePersonaSession(buildSessionKey(partMovement), response.sessionId); + return { + part, + response: { + ...response, + persona: partMovement.name, + }, + }; + } finally { + dispose(); + } + } + + private buildParallelLoggerOptions( + movementName: string, + movementIteration: number, + subMovementNames: string[], + iteration: number, + maxMovements: number, + ): ParallelLoggerOptions { + const options: ParallelLoggerOptions = { + subMovementNames, + parentOnStream: this.deps.engineOptions.onStream, + progressInfo: { iteration, maxMovements }, + }; + + if (this.deps.engineOptions.taskPrefix != null && this.deps.engineOptions.taskColorIndex != null) { + return { + ...options, + taskLabel: this.deps.engineOptions.taskPrefix, + taskColorIndex: this.deps.engineOptions.taskColorIndex, + parentMovementName: movementName, + movementIteration, + }; + } + + return options; + } +} diff --git a/src/core/piece/engine/abort-signal.ts b/src/core/piece/engine/abort-signal.ts new file mode 100644 index 0000000..73e009d --- /dev/null +++ b/src/core/piece/engine/abort-signal.ts @@ -0,0 +1,29 @@ +export function buildAbortSignal( + timeoutMs: number, + parentSignal: AbortSignal | undefined, +): { signal: AbortSignal; dispose: () => void } { + const timeoutController = new AbortController(); + const timeoutId = setTimeout(() => { + timeoutController.abort(new Error(`Part timeout after ${timeoutMs}ms`)); + }, timeoutMs); + + let abortListener: (() => void) | undefined; + if (parentSignal) { + abortListener = () => timeoutController.abort(parentSignal.reason); + if (parentSignal.aborted) { + abortListener(); + } else { + parentSignal.addEventListener('abort', abortListener, { once: true }); + } + } + + return { + signal: timeoutController.signal, + dispose: () => { + clearTimeout(timeoutId); + if (parentSignal && abortListener) { + parentSignal.removeEventListener('abort', abortListener); + } + }, + }; +} diff --git a/src/core/piece/engine/index.ts b/src/core/piece/engine/index.ts index 505be71..2058869 100644 --- a/src/core/piece/engine/index.ts +++ b/src/core/piece/engine/index.ts @@ -9,6 +9,7 @@ export { MovementExecutor } from './MovementExecutor.js'; export type { MovementExecutorDeps } from './MovementExecutor.js'; export { ParallelRunner } from './ParallelRunner.js'; export { ArpeggioRunner } from './ArpeggioRunner.js'; +export { TeamLeaderRunner } from './TeamLeaderRunner.js'; export { OptionsBuilder } from './OptionsBuilder.js'; export { CycleDetector } from './cycle-detector.js'; export type { CycleCheckResult } from './cycle-detector.js'; diff --git a/src/core/piece/engine/task-decomposer.ts b/src/core/piece/engine/task-decomposer.ts new file mode 100644 index 0000000..5754fbe --- /dev/null +++ b/src/core/piece/engine/task-decomposer.ts @@ -0,0 +1,79 @@ +import type { PartDefinition } from '../../models/part.js'; + +const JSON_CODE_BLOCK_REGEX = /```json\s*([\s\S]*?)```/g; + +function parseJsonBlock(content: string): unknown { + let lastJsonBlock: string | undefined; + let match: RegExpExecArray | null; + + while ((match = JSON_CODE_BLOCK_REGEX.exec(content)) !== null) { + if (match[1]) { + lastJsonBlock = match[1].trim(); + } + } + + if (!lastJsonBlock) { + throw new Error('Team leader output must include a ```json ... ``` block'); + } + + try { + return JSON.parse(lastJsonBlock) as unknown; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + throw new Error(`Failed to parse part JSON: ${message}`); + } +} + +function assertString(value: unknown, fieldName: string, index: number): string { + if (typeof value !== 'string' || value.trim().length === 0) { + throw new Error(`Part[${index}] "${fieldName}" must be a non-empty string`); + } + return value; +} + +function parsePartEntry(entry: unknown, index: number): PartDefinition { + if (typeof entry !== 'object' || entry == null || Array.isArray(entry)) { + throw new Error(`Part[${index}] must be an object`); + } + + const raw = entry as Record; + const id = assertString(raw.id, 'id', index); + const title = assertString(raw.title, 'title', index); + const instruction = assertString(raw.instruction, 'instruction', index); + + const timeoutMs = raw.timeout_ms; + if (timeoutMs != null && (typeof timeoutMs !== 'number' || !Number.isInteger(timeoutMs) || timeoutMs <= 0)) { + throw new Error(`Part[${index}] "timeout_ms" must be a positive integer`); + } + + return { + id, + title, + instruction, + timeoutMs: timeoutMs as number | undefined, + }; +} + +export function parseParts(content: string, maxParts: number): PartDefinition[] { + const parsed = parseJsonBlock(content); + if (!Array.isArray(parsed)) { + throw new Error('Team leader JSON must be an array'); + } + if (parsed.length === 0) { + throw new Error('Team leader JSON must contain at least one part'); + } + if (parsed.length > maxParts) { + throw new Error(`Team leader produced too many parts: ${parsed.length} > ${maxParts}`); + } + + const parts = parsed.map((entry, index) => parsePartEntry(entry, index)); + const ids = new Set(); + for (const part of parts) { + if (ids.has(part.id)) { + throw new Error(`Duplicate part id: ${part.id}`); + } + ids.add(part.id); + } + + return parts; +} diff --git a/src/infra/config/loaders/pieceParser.ts b/src/infra/config/loaders/pieceParser.ts index bd5f6df..1616800 100644 --- a/src/infra/config/loaders/pieceParser.ts +++ b/src/infra/config/loaders/pieceParser.ts @@ -10,7 +10,7 @@ import { dirname, resolve } from 'node:path'; import { parse as parseYaml } from 'yaml'; import type { z } from 'zod'; import { PieceConfigRawSchema, PieceMovementRawSchema } from '../../../core/models/index.js'; -import type { PieceConfig, PieceMovement, PieceRule, OutputContractEntry, OutputContractLabelPath, OutputContractItem, LoopMonitorConfig, LoopMonitorJudge, ArpeggioMovementConfig, ArpeggioMergeMovementConfig } from '../../../core/models/index.js'; +import type { PieceConfig, PieceMovement, PieceRule, OutputContractEntry, OutputContractLabelPath, OutputContractItem, LoopMonitorConfig, LoopMonitorJudge, ArpeggioMovementConfig, ArpeggioMergeMovementConfig, TeamLeaderConfig } from '../../../core/models/index.js'; import { getLanguage } from '../global/globalConfig.js'; import { type PieceSections, @@ -179,6 +179,31 @@ function normalizeArpeggio( }; } +/** Normalize raw team_leader config from YAML into internal format. */ +function normalizeTeamLeader( + raw: RawStep['team_leader'], + pieceDir: string, + sections: PieceSections, + context?: FacetResolutionContext, +): TeamLeaderConfig | undefined { + if (!raw) return undefined; + + const { personaSpec, personaPath } = resolvePersona(raw.persona, sections, pieceDir, context); + const { personaSpec: partPersona, personaPath: partPersonaPath } = resolvePersona(raw.part_persona, sections, pieceDir, context); + + return { + persona: personaSpec, + personaPath, + maxParts: raw.max_parts, + timeoutMs: raw.timeout_ms, + partPersona, + partPersonaPath, + partAllowedTools: raw.part_allowed_tools, + partEdit: raw.part_edit, + partPermissionMode: raw.part_permission_mode, + }; +} + /** Normalize a raw step into internal PieceMovement format. */ function normalizeStepFromRaw( step: RawStep, @@ -237,6 +262,11 @@ function normalizeStepFromRaw( result.arpeggio = arpeggioConfig; } + const teamLeaderConfig = normalizeTeamLeader(step.team_leader, pieceDir, sections, context); + if (teamLeaderConfig) { + result.teamLeader = teamLeaderConfig; + } + return result; }