diff --git a/src/__tests__/runAllTasks-concurrency.test.ts b/src/__tests__/runAllTasks-concurrency.test.ts new file mode 100644 index 0000000..3ff767a --- /dev/null +++ b/src/__tests__/runAllTasks-concurrency.test.ts @@ -0,0 +1,423 @@ +/** + * Tests for runAllTasks concurrency support + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { TaskInfo } from '../infra/task/index.js'; + +// Mock dependencies before importing the module under test +vi.mock('../infra/config/index.js', () => ({ + loadPieceByIdentifier: vi.fn(), + isPiecePath: vi.fn(() => false), + loadGlobalConfig: vi.fn(() => ({ + language: 'en', + defaultPiece: 'default', + logLevel: 'info', + concurrency: 1, + })), +})); + +import { loadGlobalConfig } from '../infra/config/index.js'; +const mockLoadGlobalConfig = vi.mocked(loadGlobalConfig); + +const mockGetNextTask = vi.fn(); +const mockGetNextTasks = vi.fn(); +const mockCompleteTask = vi.fn(); +const mockFailTask = vi.fn(); + +vi.mock('../infra/task/index.js', async (importOriginal) => ({ + ...(await importOriginal>()), + TaskRunner: vi.fn().mockImplementation(() => ({ + getNextTask: mockGetNextTask, + getNextTasks: mockGetNextTasks, + completeTask: mockCompleteTask, + failTask: mockFailTask, + })), +})); + +vi.mock('../infra/task/clone.js', async (importOriginal) => ({ + ...(await importOriginal>()), + createSharedClone: vi.fn(), + removeClone: vi.fn(), +})); + +vi.mock('../infra/task/git.js', async (importOriginal) => ({ + ...(await importOriginal>()), + getCurrentBranch: vi.fn(() => 'main'), +})); + +vi.mock('../infra/task/autoCommit.js', async (importOriginal) => ({ + ...(await importOriginal>()), + autoCommitAndPush: vi.fn(), +})); + +vi.mock('../infra/task/summarize.js', async (importOriginal) => ({ + ...(await importOriginal>()), + summarizeTaskName: vi.fn(), +})); + +vi.mock('../shared/ui/index.js', () => ({ + header: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + success: vi.fn(), + status: vi.fn(), + blankLine: vi.fn(), +})); + +vi.mock('../shared/utils/index.js', async (importOriginal) => ({ + ...(await importOriginal>()), + createLogger: () => ({ + info: vi.fn(), + debug: vi.fn(), + error: vi.fn(), + }), + getErrorMessage: vi.fn((e) => e.message), +})); + +vi.mock('../features/tasks/execute/pieceExecution.js', () => ({ + executePiece: vi.fn(() => Promise.resolve({ success: true })), +})); + +vi.mock('../shared/context.js', () => ({ + isQuietMode: vi.fn(() => false), +})); + +vi.mock('../shared/constants.js', () => ({ + DEFAULT_PIECE_NAME: 'default', + DEFAULT_LANGUAGE: 'en', +})); + +vi.mock('../infra/github/index.js', () => ({ + createPullRequest: vi.fn(), + buildPrBody: vi.fn(), + pushBranch: vi.fn(), +})); + +vi.mock('../infra/claude/index.js', () => ({ + interruptAllQueries: vi.fn(), + callAiJudge: vi.fn(), + detectRuleIndex: vi.fn(), +})); + +vi.mock('../shared/exitCodes.js', () => ({ + EXIT_SIGINT: 130, +})); + +vi.mock('../shared/i18n/index.js', () => ({ + getLabel: vi.fn((key: string) => key), +})); + +import { info, header, status, success, error as errorFn } from '../shared/ui/index.js'; +import { runAllTasks } from '../features/tasks/index.js'; +import { executePiece } from '../features/tasks/execute/pieceExecution.js'; +import { loadPieceByIdentifier } from '../infra/config/index.js'; + +const mockInfo = vi.mocked(info); +const mockHeader = vi.mocked(header); +const mockStatus = vi.mocked(status); +const mockSuccess = vi.mocked(success); +const mockError = vi.mocked(errorFn); +const mockExecutePiece = vi.mocked(executePiece); +const mockLoadPieceByIdentifier = vi.mocked(loadPieceByIdentifier); + +function createTask(name: string): TaskInfo { + return { + name, + content: `Task: ${name}`, + filePath: `/tasks/${name}.yaml`, + }; +} + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe('runAllTasks concurrency', () => { + describe('sequential execution (concurrency=1)', () => { + beforeEach(() => { + mockLoadGlobalConfig.mockReturnValue({ + language: 'en', + defaultPiece: 'default', + logLevel: 'info', + concurrency: 1, + }); + }); + + it('should show no-tasks message when no tasks exist', async () => { + // Given: No pending tasks + mockGetNextTasks.mockReturnValue([]); + + // When + await runAllTasks('/project'); + + // Then + expect(mockInfo).toHaveBeenCalledWith('No pending tasks in .takt/tasks/'); + }); + + it('should execute tasks sequentially when concurrency is 1', async () => { + // Given: Two tasks available sequentially + const task1 = createTask('task-1'); + const task2 = createTask('task-2'); + + mockGetNextTasks.mockReturnValueOnce([task1]); + mockGetNextTask + .mockReturnValueOnce(task2) + .mockReturnValueOnce(null); + + // When + await runAllTasks('/project'); + + // Then: Sequential execution uses getNextTask in the while loop + expect(mockGetNextTask).toHaveBeenCalled(); + expect(mockStatus).toHaveBeenCalledWith('Total', '2'); + }); + }); + + describe('parallel execution (concurrency>1)', () => { + beforeEach(() => { + mockLoadGlobalConfig.mockReturnValue({ + language: 'en', + defaultPiece: 'default', + logLevel: 'info', + concurrency: 3, + }); + }); + + it('should display concurrency info when concurrency > 1', async () => { + // Given: Tasks available + const task1 = createTask('task-1'); + mockGetNextTasks + .mockReturnValueOnce([task1]) + .mockReturnValueOnce([]); + + // When + await runAllTasks('/project'); + + // Then + expect(mockInfo).toHaveBeenCalledWith('Concurrency: 3'); + }); + + it('should execute tasks in batch when concurrency > 1', async () => { + // Given: 3 tasks available in first batch + const task1 = createTask('task-1'); + const task2 = createTask('task-2'); + const task3 = createTask('task-3'); + + mockGetNextTasks + .mockReturnValueOnce([task1, task2, task3]) + .mockReturnValueOnce([]); + + // When + await runAllTasks('/project'); + + // Then: Batch info shown + expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 3 task(s) ==='); + expect(mockStatus).toHaveBeenCalledWith('Total', '3'); + }); + + it('should process multiple batches', async () => { + // Given: 5 tasks, concurrency=3 → batch1 (3 tasks), batch2 (2 tasks) + const tasks = Array.from({ length: 5 }, (_, i) => createTask(`task-${i + 1}`)); + + mockGetNextTasks + .mockReturnValueOnce(tasks.slice(0, 3)) + .mockReturnValueOnce(tasks.slice(3, 5)) + .mockReturnValueOnce([]); + + // When + await runAllTasks('/project'); + + // Then: Both batches shown + expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 3 task(s) ==='); + expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 2 task(s) ==='); + expect(mockStatus).toHaveBeenCalledWith('Total', '5'); + }); + + it('should not use getNextTask in parallel mode', async () => { + // Given: Tasks in parallel mode + const task1 = createTask('task-1'); + mockGetNextTasks + .mockReturnValueOnce([task1]) + .mockReturnValueOnce([]); + + // When + await runAllTasks('/project'); + + // Then: getNextTask should not be called (parallel uses getNextTasks) + expect(mockGetNextTask).not.toHaveBeenCalled(); + }); + + it('should list task names in batch output', async () => { + // Given: Tasks with specific names + const task1 = createTask('auth-feature'); + const task2 = createTask('db-migration'); + + mockGetNextTasks + .mockReturnValueOnce([task1, task2]) + .mockReturnValueOnce([]); + + // When + await runAllTasks('/project'); + + // Then + expect(mockInfo).toHaveBeenCalledWith(' - auth-feature'); + expect(mockInfo).toHaveBeenCalledWith(' - db-migration'); + }); + }); + + describe('default concurrency', () => { + it('should default to sequential when concurrency is not set', async () => { + // Given: Config without explicit concurrency (defaults to 1) + mockLoadGlobalConfig.mockReturnValue({ + language: 'en', + defaultPiece: 'default', + logLevel: 'info', + concurrency: 1, + }); + + const task1 = createTask('task-1'); + mockGetNextTasks.mockReturnValueOnce([task1]); + mockGetNextTask.mockReturnValueOnce(null); + + // When + await runAllTasks('/project'); + + // Then: No concurrency info displayed + const concurrencyInfoCalls = mockInfo.mock.calls.filter( + (call) => typeof call[0] === 'string' && call[0].startsWith('Concurrency:') + ); + expect(concurrencyInfoCalls).toHaveLength(0); + }); + }); + + describe('parallel execution behavior', () => { + const fakePieceConfig = { + name: 'default', + movements: [{ name: 'implement', personaDisplayName: 'coder' }], + initialMovement: 'implement', + maxIterations: 10, + }; + + beforeEach(() => { + mockLoadGlobalConfig.mockReturnValue({ + language: 'en', + defaultPiece: 'default', + logLevel: 'info', + concurrency: 3, + }); + // Return a valid piece config so executeTask reaches executePiece + mockLoadPieceByIdentifier.mockReturnValue(fakePieceConfig as never); + }); + + it('should run batch tasks concurrently, not sequentially', async () => { + // Given: 2 tasks with delayed execution to verify concurrency + const task1 = createTask('slow-1'); + const task2 = createTask('slow-2'); + + const executionOrder: string[] = []; + + // Each task takes 50ms — if sequential, total > 100ms; if parallel, total ~50ms + mockExecutePiece.mockImplementation((_config, task) => { + executionOrder.push(`start:${task}`); + return new Promise((resolve) => { + setTimeout(() => { + executionOrder.push(`end:${task}`); + resolve({ success: true }); + }, 50); + }); + }); + + mockGetNextTasks + .mockReturnValueOnce([task1, task2]) + .mockReturnValueOnce([]); + + // When + const startTime = Date.now(); + await runAllTasks('/project'); + const elapsed = Date.now() - startTime; + + // Then: Both tasks started before either completed (concurrent execution) + expect(executionOrder[0]).toBe('start:Task: slow-1'); + expect(executionOrder[1]).toBe('start:Task: slow-2'); + // Elapsed time should be closer to 50ms than 100ms (allowing margin for CI) + expect(elapsed).toBeLessThan(150); + }); + + it('should count partial failures correctly in a batch', async () => { + // Given: 3 tasks, 1 fails, 2 succeed + const task1 = createTask('pass-1'); + const task2 = createTask('fail-1'); + const task3 = createTask('pass-2'); + + let callIndex = 0; + mockExecutePiece.mockImplementation(() => { + callIndex++; + // Second call fails + return Promise.resolve({ success: callIndex !== 2 }); + }); + + mockGetNextTasks + .mockReturnValueOnce([task1, task2, task3]) + .mockReturnValueOnce([]); + + // When + await runAllTasks('/project'); + + // Then: Correct success/fail counts + expect(mockStatus).toHaveBeenCalledWith('Total', '3'); + expect(mockStatus).toHaveBeenCalledWith('Success', '2', undefined); + expect(mockStatus).toHaveBeenCalledWith('Failed', '1', 'red'); + }); + + it('should pass abortSignal and quiet=true to executePiece in parallel mode', async () => { + // Given: One task in parallel mode + const task1 = createTask('parallel-task'); + + mockExecutePiece.mockResolvedValue({ success: true }); + + mockGetNextTasks + .mockReturnValueOnce([task1]) + .mockReturnValueOnce([]); + + // When + await runAllTasks('/project'); + + // Then: executePiece received abortSignal and quiet options + expect(mockExecutePiece).toHaveBeenCalledTimes(1); + const callArgs = mockExecutePiece.mock.calls[0]; + const pieceOptions = callArgs?.[3]; // 4th argument is options + expect(pieceOptions).toHaveProperty('abortSignal'); + expect(pieceOptions?.abortSignal).toBeInstanceOf(AbortSignal); + expect(pieceOptions).toHaveProperty('quiet', true); + }); + + it('should not pass abortSignal or quiet in sequential mode', async () => { + // Given: Sequential mode + mockLoadGlobalConfig.mockReturnValue({ + language: 'en', + defaultPiece: 'default', + logLevel: 'info', + concurrency: 1, + }); + + const task1 = createTask('sequential-task'); + mockExecutePiece.mockResolvedValue({ success: true }); + mockLoadPieceByIdentifier.mockReturnValue(fakePieceConfig as never); + + mockGetNextTasks.mockReturnValueOnce([task1]); + mockGetNextTask.mockReturnValueOnce(null); + + // When + await runAllTasks('/project'); + + // Then: executePiece should not have abortSignal or quiet + expect(mockExecutePiece).toHaveBeenCalledTimes(1); + const callArgs = mockExecutePiece.mock.calls[0]; + const pieceOptions = callArgs?.[3]; + expect(pieceOptions?.abortSignal).toBeUndefined(); + expect(pieceOptions?.quiet).toBeFalsy(); + }); + }); +}); diff --git a/src/__tests__/task.test.ts b/src/__tests__/task.test.ts index 20ce2c8..4b16813 100644 --- a/src/__tests__/task.test.ts +++ b/src/__tests__/task.test.ts @@ -144,6 +144,52 @@ describe('TaskRunner', () => { }); }); + describe('getNextTasks', () => { + it('should return empty array when no tasks', () => { + const tasks = runner.getNextTasks(3); + expect(tasks).toEqual([]); + }); + + it('should return all tasks when count exceeds available tasks', () => { + const tasksDir = join(testDir, '.takt', 'tasks'); + mkdirSync(tasksDir, { recursive: true }); + writeFileSync(join(tasksDir, 'b-task.md'), 'B'); + writeFileSync(join(tasksDir, 'a-task.md'), 'A'); + + const tasks = runner.getNextTasks(5); + expect(tasks).toHaveLength(2); + expect(tasks[0]?.name).toBe('a-task'); + expect(tasks[1]?.name).toBe('b-task'); + }); + + it('should return only count tasks when more are available', () => { + const tasksDir = join(testDir, '.takt', 'tasks'); + mkdirSync(tasksDir, { recursive: true }); + writeFileSync(join(tasksDir, 'c-task.md'), 'C'); + writeFileSync(join(tasksDir, 'b-task.md'), 'B'); + writeFileSync(join(tasksDir, 'a-task.md'), 'A'); + + const tasks = runner.getNextTasks(2); + expect(tasks).toHaveLength(2); + expect(tasks[0]?.name).toBe('a-task'); + expect(tasks[1]?.name).toBe('b-task'); + }); + + it('should return tasks in same sort order as getNextTask', () => { + const tasksDir = join(testDir, '.takt', 'tasks'); + mkdirSync(tasksDir, { recursive: true }); + writeFileSync(join(tasksDir, '02-second.md'), 'Second'); + writeFileSync(join(tasksDir, '01-first.md'), 'First'); + writeFileSync(join(tasksDir, '03-third.md'), 'Third'); + + const nextTask = runner.getNextTask(); + const nextTasks = runner.getNextTasks(1); + + expect(nextTasks).toHaveLength(1); + expect(nextTasks[0]?.name).toBe(nextTask?.name); + }); + }); + describe('completeTask', () => { it('should move task to completed directory', () => { const tasksDir = join(testDir, '.takt', 'tasks'); diff --git a/src/core/models/global-config.ts b/src/core/models/global-config.ts index 38d39c6..9375234 100644 --- a/src/core/models/global-config.ts +++ b/src/core/models/global-config.ts @@ -67,6 +67,8 @@ export interface GlobalConfig { preventSleep?: boolean; /** Enable notification sounds (default: true when undefined) */ notificationSound?: boolean; + /** Number of tasks to run concurrently in takt run (default: 1 = sequential) */ + concurrency: number; } /** Project-level configuration */ diff --git a/src/core/models/schemas.ts b/src/core/models/schemas.ts index 6669d69..8e50eb0 100644 --- a/src/core/models/schemas.ts +++ b/src/core/models/schemas.ts @@ -318,6 +318,8 @@ export const GlobalConfigSchema = z.object({ prevent_sleep: z.boolean().optional(), /** Enable notification sounds (default: true when undefined) */ notification_sound: z.boolean().optional(), + /** Number of tasks to run concurrently in takt run (default: 1 = sequential, max: 10) */ + concurrency: z.number().int().min(1).max(10).optional().default(1), }); /** Project config schema */ diff --git a/src/features/tasks/execute/parallelExecution.ts b/src/features/tasks/execute/parallelExecution.ts new file mode 100644 index 0000000..bd2a984 --- /dev/null +++ b/src/features/tasks/execute/parallelExecution.ts @@ -0,0 +1,75 @@ +/** + * Parallel task execution strategy. + * + * Runs tasks in batches of up to `concurrency` tasks at a time. + * Uses a single AbortController shared across all tasks in all batches. + */ + +import type { TaskRunner, TaskInfo } from '../../../infra/task/index.js'; +import { info, blankLine } from '../../../shared/ui/index.js'; +import { executeAndCompleteTask } from './taskExecution.js'; +import { installSigIntHandler } from './sigintHandler.js'; +import type { TaskExecutionOptions } from './types.js'; + +interface BatchResult { + success: number; + fail: number; +} + +/** + * Run tasks in parallel batches. + * + * @returns Aggregated success/fail counts across all batches + */ +export async function runParallel( + taskRunner: TaskRunner, + initialTasks: TaskInfo[], + concurrency: number, + cwd: string, + pieceName: string, + options?: TaskExecutionOptions, +): Promise { + const abortController = new AbortController(); + const { cleanup } = installSigIntHandler(() => abortController.abort()); + + let successCount = 0; + let failCount = 0; + + try { + let batch = initialTasks; + while (batch.length > 0) { + blankLine(); + info(`=== Running batch of ${batch.length} task(s) ===`); + for (const task of batch) { + info(` - ${task.name}`); + } + blankLine(); + + const results = await Promise.all( + batch.map((task) => + executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, { + abortSignal: abortController.signal, + }), + ), + ); + + for (const taskSuccess of results) { + if (taskSuccess) { + successCount++; + } else { + failCount++; + } + } + + if (abortController.signal.aborted) { + break; + } + + batch = taskRunner.getNextTasks(concurrency); + } + } finally { + cleanup(); + } + + return { success: successCount, fail: failCount }; +} diff --git a/src/features/tasks/execute/pieceExecution.ts b/src/features/tasks/execute/pieceExecution.ts index 28ac91e..b777ece 100644 --- a/src/features/tasks/execute/pieceExecution.ts +++ b/src/features/tasks/execute/pieceExecution.ts @@ -57,8 +57,8 @@ import { } from '../../../shared/utils/index.js'; import type { PromptLogRecord } from '../../../shared/utils/index.js'; import { selectOption, promptInput } from '../../../shared/prompt/index.js'; -import { EXIT_SIGINT } from '../../../shared/exitCodes.js'; import { getLabel } from '../../../shared/i18n/index.js'; +import { installSigIntHandler } from './sigintHandler.js'; const log = createLogger('piece'); @@ -322,8 +322,9 @@ export async function executePiece( const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name); const totalMovements = pieceConfig.movements.length; - // Use quiet mode from CLI (already resolved CLI flag + config in preAction) - displayRef.current = new StreamDisplay(step.personaDisplayName, isQuietMode(), { + // Use quiet mode: forced quiet in parallel execution, or CLI/config setting + const quiet = options.quiet === true || isQuietMode(); + displayRef.current = new StreamDisplay(step.personaDisplayName, quiet, { iteration, maxIterations: pieceConfig.maxIterations, movementIndex: movementIndex >= 0 ? movementIndex : 0, @@ -506,23 +507,25 @@ export async function executePiece( throw err; }; - // SIGINT handler: 1st Ctrl+C = graceful abort, 2nd = force exit - let sigintCount = 0; - const onSigInt = () => { - sigintCount++; - if (sigintCount === 1) { - blankLine(); - warn(getLabel('piece.sigintGraceful')); - process.on('uncaughtException', onEpipe); - interruptAllQueries(); - engine.abort(); - } else { - blankLine(); - error(getLabel('piece.sigintForce')); - process.exit(EXIT_SIGINT); - } + const abortEngine = () => { + process.on('uncaughtException', onEpipe); + interruptAllQueries(); + engine.abort(); }; - process.on('SIGINT', onSigInt); + + // SIGINT handling: when abortSignal is provided (parallel mode), delegate to caller + const useExternalAbort = Boolean(options.abortSignal); + + let onAbortSignal: (() => void) | undefined; + let sigintCleanup: (() => void) | undefined; + + if (useExternalAbort) { + onAbortSignal = abortEngine; + options.abortSignal!.addEventListener('abort', onAbortSignal, { once: true }); + } else { + const handler = installSigIntHandler(abortEngine); + sigintCleanup = handler.cleanup; + } try { const finalState = await engine.run(); @@ -532,7 +535,10 @@ export async function executePiece( reason: abortReason, }; } finally { - process.removeListener('SIGINT', onSigInt); + sigintCleanup?.(); + if (onAbortSignal && options.abortSignal) { + options.abortSignal.removeEventListener('abort', onAbortSignal); + } process.removeListener('uncaughtException', onEpipe); } } diff --git a/src/features/tasks/execute/resolveTask.ts b/src/features/tasks/execute/resolveTask.ts new file mode 100644 index 0000000..3ae6e93 --- /dev/null +++ b/src/features/tasks/execute/resolveTask.ts @@ -0,0 +1,72 @@ +/** + * Resolve execution directory and piece from task data. + */ + +import { loadGlobalConfig } from '../../../infra/config/index.js'; +import { type TaskInfo, createSharedClone, summarizeTaskName, getCurrentBranch } from '../../../infra/task/index.js'; +import { info } from '../../../shared/ui/index.js'; + +export interface ResolvedTaskExecution { + execCwd: string; + execPiece: string; + isWorktree: boolean; + branch?: string; + baseBranch?: string; + startMovement?: string; + retryNote?: string; + autoPr?: boolean; +} + +/** + * Resolve execution directory and piece from task data. + * If the task has worktree settings, create a shared clone and use it as cwd. + * Task name is summarized to English by AI for use in branch/clone names. + */ +export async function resolveTaskExecution( + task: TaskInfo, + defaultCwd: string, + defaultPiece: string, +): Promise { + const data = task.data; + + if (!data) { + return { execCwd: defaultCwd, execPiece: defaultPiece, isWorktree: false }; + } + + let execCwd = defaultCwd; + let isWorktree = false; + let branch: string | undefined; + let baseBranch: string | undefined; + + if (data.worktree) { + baseBranch = getCurrentBranch(defaultCwd); + info('Generating branch name...'); + const taskSlug = await summarizeTaskName(task.content, { cwd: defaultCwd }); + + info('Creating clone...'); + const result = createSharedClone(defaultCwd, { + worktree: data.worktree, + branch: data.branch, + taskSlug, + issueNumber: data.issue, + }); + execCwd = result.path; + branch = result.branch; + isWorktree = true; + info(`Clone created: ${result.path} (branch: ${result.branch})`); + } + + const execPiece = data.piece || defaultPiece; + const startMovement = data.start_movement; + const retryNote = data.retry_note; + + let autoPr: boolean | undefined; + if (data.auto_pr !== undefined) { + autoPr = data.auto_pr; + } else { + const globalConfig = loadGlobalConfig(); + autoPr = globalConfig.autoPr; + } + + return { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr }; +} diff --git a/src/features/tasks/execute/sigintHandler.ts b/src/features/tasks/execute/sigintHandler.ts new file mode 100644 index 0000000..1b8f028 --- /dev/null +++ b/src/features/tasks/execute/sigintHandler.ts @@ -0,0 +1,32 @@ +/** + * Shared SIGINT handler for graceful/force shutdown pattern. + * + * 1st Ctrl+C = graceful abort via onAbort callback + * 2nd Ctrl+C = force exit + */ + +import { blankLine, warn, error } from '../../../shared/ui/index.js'; +import { EXIT_SIGINT } from '../../../shared/exitCodes.js'; +import { getLabel } from '../../../shared/i18n/index.js'; + +interface SigIntHandler { + cleanup: () => void; +} + +export function installSigIntHandler(onAbort: () => void): SigIntHandler { + let sigintCount = 0; + const handler = () => { + sigintCount++; + if (sigintCount === 1) { + blankLine(); + warn(getLabel('piece.sigintGraceful')); + onAbort(); + } else { + blankLine(); + error(getLabel('piece.sigintForce')); + process.exit(EXIT_SIGINT); + } + }; + process.on('SIGINT', handler); + return { cleanup: () => process.removeListener('SIGINT', handler) }; +} diff --git a/src/features/tasks/execute/taskExecution.ts b/src/features/tasks/execute/taskExecution.ts index 74318a2..74cc71a 100644 --- a/src/features/tasks/execute/taskExecution.ts +++ b/src/features/tasks/execute/taskExecution.ts @@ -3,7 +3,7 @@ */ import { loadPieceByIdentifier, isPiecePath, loadGlobalConfig } from '../../../infra/config/index.js'; -import { TaskRunner, type TaskInfo, createSharedClone, autoCommitAndPush, summarizeTaskName, getCurrentBranch } from '../../../infra/task/index.js'; +import { TaskRunner, type TaskInfo, autoCommitAndPush } from '../../../infra/task/index.js'; import { header, info, @@ -17,6 +17,8 @@ import { executePiece } from './pieceExecution.js'; import { DEFAULT_PIECE_NAME } from '../../../shared/constants.js'; import type { TaskExecutionOptions, ExecuteTaskOptions } from './types.js'; import { createPullRequest, buildPrBody, pushBranch } from '../../../infra/github/index.js'; +import { runParallel } from './parallelExecution.js'; +import { resolveTaskExecution } from './resolveTask.js'; export type { TaskExecutionOptions, ExecuteTaskOptions }; @@ -26,7 +28,7 @@ const log = createLogger('task'); * Execute a single task with piece. */ export async function executeTask(options: ExecuteTaskOptions): Promise { - const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote } = options; + const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, quiet } = options; const pieceConfig = loadPieceByIdentifier(pieceIdentifier, projectCwd); if (!pieceConfig) { @@ -55,6 +57,8 @@ export async function executeTask(options: ExecuteTaskOptions): Promise interactiveMetadata, startMovement, retryNote, + abortSignal, + quiet, }); return result.success; } @@ -73,6 +77,7 @@ export async function executeAndCompleteTask( cwd: string, pieceName: string, options?: TaskExecutionOptions, + parallelOptions?: { abortSignal: AbortSignal }, ): Promise { const startedAt = new Date().toISOString(); const executionLog: string[] = []; @@ -89,6 +94,8 @@ export async function executeAndCompleteTask( agentOverrides: options, startMovement, retryNote, + abortSignal: parallelOptions?.abortSignal, + quiet: parallelOptions !== undefined, }); const completedAt = new Date().toISOString(); @@ -161,32 +168,19 @@ export async function executeAndCompleteTask( } /** - * Run all pending tasks from .takt/tasks/ - * - * タスクを動的に取得する。各タスク実行前に次のタスクを取得するため、 - * 実行中にタスクファイルが追加・削除されても反映される。 + * Run tasks sequentially, fetching one at a time. */ -export async function runAllTasks( +async function runSequential( + taskRunner: TaskRunner, + initialTask: TaskInfo, cwd: string, - pieceName: string = DEFAULT_PIECE_NAME, + pieceName: string, options?: TaskExecutionOptions, -): Promise { - const taskRunner = new TaskRunner(cwd); - - // 最初のタスクを取得 - let task = taskRunner.getNextTask(); - - if (!task) { - info('No pending tasks in .takt/tasks/'); - info('Create task files as .takt/tasks/*.yaml or use takt add'); - return; - } - - header('Running tasks'); - +): Promise<{ success: number; fail: number }> { let successCount = 0; let failCount = 0; + let task: TaskInfo | undefined = initialTask; while (task) { blankLine(); info(`=== Task: ${task.name} ===`); @@ -200,79 +194,54 @@ export async function runAllTasks( failCount++; } - // 次のタスクを動的に取得(新しく追加されたタスクも含む) - task = taskRunner.getNextTask(); + task = taskRunner.getNextTask() ?? undefined; } - const totalCount = successCount + failCount; - blankLine(); - header('Tasks Summary'); - status('Total', String(totalCount)); - status('Success', String(successCount), successCount === totalCount ? 'green' : undefined); - if (failCount > 0) { - status('Failed', String(failCount), 'red'); - } + return { success: successCount, fail: failCount }; } /** - * Resolve execution directory and piece from task data. - * If the task has worktree settings, create a shared clone and use it as cwd. - * Task name is summarized to English by AI for use in branch/clone names. + * Run all pending tasks from .takt/tasks/ + * + * concurrency=1: 逐次実行(従来動作) + * concurrency=N (N>1): 最大N個のタスクをバッチ並列実行 */ -export async function resolveTaskExecution( - task: TaskInfo, - defaultCwd: string, - defaultPiece: string -): Promise<{ execCwd: string; execPiece: string; isWorktree: boolean; branch?: string; baseBranch?: string; startMovement?: string; retryNote?: string; autoPr?: boolean }> { - const data = task.data; +export async function runAllTasks( + cwd: string, + pieceName: string = DEFAULT_PIECE_NAME, + options?: TaskExecutionOptions, +): Promise { + const taskRunner = new TaskRunner(cwd); + const globalConfig = loadGlobalConfig(); + const concurrency = globalConfig.concurrency; - // No structured data: use defaults - if (!data) { - return { execCwd: defaultCwd, execPiece: defaultPiece, isWorktree: false }; + const initialTasks = taskRunner.getNextTasks(concurrency); + + if (initialTasks.length === 0) { + info('No pending tasks in .takt/tasks/'); + info('Create task files as .takt/tasks/*.yaml or use takt add'); + return; } - let execCwd = defaultCwd; - let isWorktree = false; - let branch: string | undefined; - let baseBranch: string | undefined; - - // Handle worktree (now creates a shared clone) - if (data.worktree) { - baseBranch = getCurrentBranch(defaultCwd); - // Summarize task content to English slug using AI - info('Generating branch name...'); - const taskSlug = await summarizeTaskName(task.content, { cwd: defaultCwd }); - - info('Creating clone...'); - const result = createSharedClone(defaultCwd, { - worktree: data.worktree, - branch: data.branch, - taskSlug, - issueNumber: data.issue, - }); - execCwd = result.path; - branch = result.branch; - isWorktree = true; - info(`Clone created: ${result.path} (branch: ${result.branch})`); + header('Running tasks'); + if (concurrency > 1) { + info(`Concurrency: ${concurrency}`); } - // Handle piece override - const execPiece = data.piece || defaultPiece; + // initialTasks is guaranteed non-empty at this point (early return above) + const result = concurrency <= 1 + ? await runSequential(taskRunner, initialTasks[0]!, cwd, pieceName, options) + : await runParallel(taskRunner, initialTasks, concurrency, cwd, pieceName, options); - // Handle start_movement override - const startMovement = data.start_movement; - - // Handle retry_note - const retryNote = data.retry_note; - - // Handle auto_pr (task YAML > global config) - let autoPr: boolean | undefined; - if (data.auto_pr !== undefined) { - autoPr = data.auto_pr; - } else { - const globalConfig = loadGlobalConfig(); - autoPr = globalConfig.autoPr; + const totalCount = result.success + result.fail; + blankLine(); + header('Tasks Summary'); + status('Total', String(totalCount)); + status('Success', String(result.success), result.success === totalCount ? 'green' : undefined); + if (result.fail > 0) { + status('Failed', String(result.fail), 'red'); } - - return { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr }; } + +// Re-export for backward compatibility with existing consumers +export { resolveTaskExecution } from './resolveTask.js'; diff --git a/src/features/tasks/execute/types.ts b/src/features/tasks/execute/types.ts index a1355c1..198a8fa 100644 --- a/src/features/tasks/execute/types.ts +++ b/src/features/tasks/execute/types.ts @@ -38,6 +38,10 @@ export interface PieceExecutionOptions { startMovement?: string; /** Retry note explaining why task is being retried */ retryNote?: string; + /** External abort signal for parallel execution — when provided, SIGINT handling is delegated to caller */ + abortSignal?: AbortSignal; + /** Force quiet mode for streaming output (used in parallel execution to prevent interleaving) */ + quiet?: boolean; } export interface TaskExecutionOptions { @@ -64,6 +68,10 @@ export interface ExecuteTaskOptions { startMovement?: string; /** Retry note explaining why task is being retried */ retryNote?: string; + /** External abort signal for parallel execution — when provided, SIGINT handling is delegated to caller */ + abortSignal?: AbortSignal; + /** Force quiet mode for streaming output (used in parallel execution to prevent interleaving) */ + quiet?: boolean; } export interface PipelineExecutionOptions { diff --git a/src/infra/config/global/globalConfig.ts b/src/infra/config/global/globalConfig.ts index 0c390a5..4509853 100644 --- a/src/infra/config/global/globalConfig.ts +++ b/src/infra/config/global/globalConfig.ts @@ -35,6 +35,7 @@ function createDefaultGlobalConfig(): GlobalConfig { logLevel: 'info', provider: 'claude', enableBuiltinPieces: true, + concurrency: 1, }; } @@ -106,6 +107,7 @@ export class GlobalConfigManager { branchNameStrategy: parsed.branch_name_strategy, preventSleep: parsed.prevent_sleep, notificationSound: parsed.notification_sound, + concurrency: parsed.concurrency, }; validateProviderModelCompatibility(config.provider, config.model); this.cachedConfig = config; @@ -175,6 +177,9 @@ export class GlobalConfigManager { if (config.notificationSound !== undefined) { raw.notification_sound = config.notificationSound; } + if (config.concurrency !== undefined && config.concurrency > 1) { + raw.concurrency = config.concurrency; + } writeFileSync(configPath, stringifyYaml(raw), 'utf-8'); this.invalidateCache(); } diff --git a/src/infra/task/runner.ts b/src/infra/task/runner.ts index 6d34c36..9b4cf63 100644 --- a/src/infra/task/runner.ts +++ b/src/infra/task/runner.ts @@ -105,6 +105,14 @@ export class TaskRunner { return tasks[0] ?? null; } + /** + * 次に実行すべきタスクを指定数分取得 + */ + getNextTasks(count: number): TaskInfo[] { + const tasks = this.listTasks(); + return tasks.slice(0, count); + } + /** * タスクを完了としてマーク *