takt: github-issue-143-tasuku-takt-r
This commit is contained in:
parent
b5ec0762b6
commit
f324225565
423
src/__tests__/runAllTasks-concurrency.test.ts
Normal file
423
src/__tests__/runAllTasks-concurrency.test.ts
Normal file
@ -0,0 +1,423 @@
|
||||
/**
|
||||
* Tests for runAllTasks concurrency support
|
||||
*/
|
||||
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import type { TaskInfo } from '../infra/task/index.js';
|
||||
|
||||
// Mock dependencies before importing the module under test
|
||||
vi.mock('../infra/config/index.js', () => ({
|
||||
loadPieceByIdentifier: vi.fn(),
|
||||
isPiecePath: vi.fn(() => false),
|
||||
loadGlobalConfig: vi.fn(() => ({
|
||||
language: 'en',
|
||||
defaultPiece: 'default',
|
||||
logLevel: 'info',
|
||||
concurrency: 1,
|
||||
})),
|
||||
}));
|
||||
|
||||
import { loadGlobalConfig } from '../infra/config/index.js';
|
||||
const mockLoadGlobalConfig = vi.mocked(loadGlobalConfig);
|
||||
|
||||
const mockGetNextTask = vi.fn();
|
||||
const mockGetNextTasks = vi.fn();
|
||||
const mockCompleteTask = vi.fn();
|
||||
const mockFailTask = vi.fn();
|
||||
|
||||
vi.mock('../infra/task/index.js', async (importOriginal) => ({
|
||||
...(await importOriginal<Record<string, unknown>>()),
|
||||
TaskRunner: vi.fn().mockImplementation(() => ({
|
||||
getNextTask: mockGetNextTask,
|
||||
getNextTasks: mockGetNextTasks,
|
||||
completeTask: mockCompleteTask,
|
||||
failTask: mockFailTask,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock('../infra/task/clone.js', async (importOriginal) => ({
|
||||
...(await importOriginal<Record<string, unknown>>()),
|
||||
createSharedClone: vi.fn(),
|
||||
removeClone: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../infra/task/git.js', async (importOriginal) => ({
|
||||
...(await importOriginal<Record<string, unknown>>()),
|
||||
getCurrentBranch: vi.fn(() => 'main'),
|
||||
}));
|
||||
|
||||
vi.mock('../infra/task/autoCommit.js', async (importOriginal) => ({
|
||||
...(await importOriginal<Record<string, unknown>>()),
|
||||
autoCommitAndPush: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../infra/task/summarize.js', async (importOriginal) => ({
|
||||
...(await importOriginal<Record<string, unknown>>()),
|
||||
summarizeTaskName: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../shared/ui/index.js', () => ({
|
||||
header: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
success: vi.fn(),
|
||||
status: vi.fn(),
|
||||
blankLine: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../shared/utils/index.js', async (importOriginal) => ({
|
||||
...(await importOriginal<Record<string, unknown>>()),
|
||||
createLogger: () => ({
|
||||
info: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
error: vi.fn(),
|
||||
}),
|
||||
getErrorMessage: vi.fn((e) => e.message),
|
||||
}));
|
||||
|
||||
vi.mock('../features/tasks/execute/pieceExecution.js', () => ({
|
||||
executePiece: vi.fn(() => Promise.resolve({ success: true })),
|
||||
}));
|
||||
|
||||
vi.mock('../shared/context.js', () => ({
|
||||
isQuietMode: vi.fn(() => false),
|
||||
}));
|
||||
|
||||
vi.mock('../shared/constants.js', () => ({
|
||||
DEFAULT_PIECE_NAME: 'default',
|
||||
DEFAULT_LANGUAGE: 'en',
|
||||
}));
|
||||
|
||||
vi.mock('../infra/github/index.js', () => ({
|
||||
createPullRequest: vi.fn(),
|
||||
buildPrBody: vi.fn(),
|
||||
pushBranch: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../infra/claude/index.js', () => ({
|
||||
interruptAllQueries: vi.fn(),
|
||||
callAiJudge: vi.fn(),
|
||||
detectRuleIndex: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../shared/exitCodes.js', () => ({
|
||||
EXIT_SIGINT: 130,
|
||||
}));
|
||||
|
||||
vi.mock('../shared/i18n/index.js', () => ({
|
||||
getLabel: vi.fn((key: string) => key),
|
||||
}));
|
||||
|
||||
import { info, header, status, success, error as errorFn } from '../shared/ui/index.js';
|
||||
import { runAllTasks } from '../features/tasks/index.js';
|
||||
import { executePiece } from '../features/tasks/execute/pieceExecution.js';
|
||||
import { loadPieceByIdentifier } from '../infra/config/index.js';
|
||||
|
||||
const mockInfo = vi.mocked(info);
|
||||
const mockHeader = vi.mocked(header);
|
||||
const mockStatus = vi.mocked(status);
|
||||
const mockSuccess = vi.mocked(success);
|
||||
const mockError = vi.mocked(errorFn);
|
||||
const mockExecutePiece = vi.mocked(executePiece);
|
||||
const mockLoadPieceByIdentifier = vi.mocked(loadPieceByIdentifier);
|
||||
|
||||
function createTask(name: string): TaskInfo {
|
||||
return {
|
||||
name,
|
||||
content: `Task: ${name}`,
|
||||
filePath: `/tasks/${name}.yaml`,
|
||||
};
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('runAllTasks concurrency', () => {
|
||||
describe('sequential execution (concurrency=1)', () => {
|
||||
beforeEach(() => {
|
||||
mockLoadGlobalConfig.mockReturnValue({
|
||||
language: 'en',
|
||||
defaultPiece: 'default',
|
||||
logLevel: 'info',
|
||||
concurrency: 1,
|
||||
});
|
||||
});
|
||||
|
||||
it('should show no-tasks message when no tasks exist', async () => {
|
||||
// Given: No pending tasks
|
||||
mockGetNextTasks.mockReturnValue([]);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then
|
||||
expect(mockInfo).toHaveBeenCalledWith('No pending tasks in .takt/tasks/');
|
||||
});
|
||||
|
||||
it('should execute tasks sequentially when concurrency is 1', async () => {
|
||||
// Given: Two tasks available sequentially
|
||||
const task1 = createTask('task-1');
|
||||
const task2 = createTask('task-2');
|
||||
|
||||
mockGetNextTasks.mockReturnValueOnce([task1]);
|
||||
mockGetNextTask
|
||||
.mockReturnValueOnce(task2)
|
||||
.mockReturnValueOnce(null);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then: Sequential execution uses getNextTask in the while loop
|
||||
expect(mockGetNextTask).toHaveBeenCalled();
|
||||
expect(mockStatus).toHaveBeenCalledWith('Total', '2');
|
||||
});
|
||||
});
|
||||
|
||||
describe('parallel execution (concurrency>1)', () => {
|
||||
beforeEach(() => {
|
||||
mockLoadGlobalConfig.mockReturnValue({
|
||||
language: 'en',
|
||||
defaultPiece: 'default',
|
||||
logLevel: 'info',
|
||||
concurrency: 3,
|
||||
});
|
||||
});
|
||||
|
||||
it('should display concurrency info when concurrency > 1', async () => {
|
||||
// Given: Tasks available
|
||||
const task1 = createTask('task-1');
|
||||
mockGetNextTasks
|
||||
.mockReturnValueOnce([task1])
|
||||
.mockReturnValueOnce([]);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then
|
||||
expect(mockInfo).toHaveBeenCalledWith('Concurrency: 3');
|
||||
});
|
||||
|
||||
it('should execute tasks in batch when concurrency > 1', async () => {
|
||||
// Given: 3 tasks available in first batch
|
||||
const task1 = createTask('task-1');
|
||||
const task2 = createTask('task-2');
|
||||
const task3 = createTask('task-3');
|
||||
|
||||
mockGetNextTasks
|
||||
.mockReturnValueOnce([task1, task2, task3])
|
||||
.mockReturnValueOnce([]);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then: Batch info shown
|
||||
expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 3 task(s) ===');
|
||||
expect(mockStatus).toHaveBeenCalledWith('Total', '3');
|
||||
});
|
||||
|
||||
it('should process multiple batches', async () => {
|
||||
// Given: 5 tasks, concurrency=3 → batch1 (3 tasks), batch2 (2 tasks)
|
||||
const tasks = Array.from({ length: 5 }, (_, i) => createTask(`task-${i + 1}`));
|
||||
|
||||
mockGetNextTasks
|
||||
.mockReturnValueOnce(tasks.slice(0, 3))
|
||||
.mockReturnValueOnce(tasks.slice(3, 5))
|
||||
.mockReturnValueOnce([]);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then: Both batches shown
|
||||
expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 3 task(s) ===');
|
||||
expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 2 task(s) ===');
|
||||
expect(mockStatus).toHaveBeenCalledWith('Total', '5');
|
||||
});
|
||||
|
||||
it('should not use getNextTask in parallel mode', async () => {
|
||||
// Given: Tasks in parallel mode
|
||||
const task1 = createTask('task-1');
|
||||
mockGetNextTasks
|
||||
.mockReturnValueOnce([task1])
|
||||
.mockReturnValueOnce([]);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then: getNextTask should not be called (parallel uses getNextTasks)
|
||||
expect(mockGetNextTask).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should list task names in batch output', async () => {
|
||||
// Given: Tasks with specific names
|
||||
const task1 = createTask('auth-feature');
|
||||
const task2 = createTask('db-migration');
|
||||
|
||||
mockGetNextTasks
|
||||
.mockReturnValueOnce([task1, task2])
|
||||
.mockReturnValueOnce([]);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then
|
||||
expect(mockInfo).toHaveBeenCalledWith(' - auth-feature');
|
||||
expect(mockInfo).toHaveBeenCalledWith(' - db-migration');
|
||||
});
|
||||
});
|
||||
|
||||
describe('default concurrency', () => {
|
||||
it('should default to sequential when concurrency is not set', async () => {
|
||||
// Given: Config without explicit concurrency (defaults to 1)
|
||||
mockLoadGlobalConfig.mockReturnValue({
|
||||
language: 'en',
|
||||
defaultPiece: 'default',
|
||||
logLevel: 'info',
|
||||
concurrency: 1,
|
||||
});
|
||||
|
||||
const task1 = createTask('task-1');
|
||||
mockGetNextTasks.mockReturnValueOnce([task1]);
|
||||
mockGetNextTask.mockReturnValueOnce(null);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then: No concurrency info displayed
|
||||
const concurrencyInfoCalls = mockInfo.mock.calls.filter(
|
||||
(call) => typeof call[0] === 'string' && call[0].startsWith('Concurrency:')
|
||||
);
|
||||
expect(concurrencyInfoCalls).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('parallel execution behavior', () => {
|
||||
const fakePieceConfig = {
|
||||
name: 'default',
|
||||
movements: [{ name: 'implement', personaDisplayName: 'coder' }],
|
||||
initialMovement: 'implement',
|
||||
maxIterations: 10,
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
mockLoadGlobalConfig.mockReturnValue({
|
||||
language: 'en',
|
||||
defaultPiece: 'default',
|
||||
logLevel: 'info',
|
||||
concurrency: 3,
|
||||
});
|
||||
// Return a valid piece config so executeTask reaches executePiece
|
||||
mockLoadPieceByIdentifier.mockReturnValue(fakePieceConfig as never);
|
||||
});
|
||||
|
||||
it('should run batch tasks concurrently, not sequentially', async () => {
|
||||
// Given: 2 tasks with delayed execution to verify concurrency
|
||||
const task1 = createTask('slow-1');
|
||||
const task2 = createTask('slow-2');
|
||||
|
||||
const executionOrder: string[] = [];
|
||||
|
||||
// Each task takes 50ms — if sequential, total > 100ms; if parallel, total ~50ms
|
||||
mockExecutePiece.mockImplementation((_config, task) => {
|
||||
executionOrder.push(`start:${task}`);
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
executionOrder.push(`end:${task}`);
|
||||
resolve({ success: true });
|
||||
}, 50);
|
||||
});
|
||||
});
|
||||
|
||||
mockGetNextTasks
|
||||
.mockReturnValueOnce([task1, task2])
|
||||
.mockReturnValueOnce([]);
|
||||
|
||||
// When
|
||||
const startTime = Date.now();
|
||||
await runAllTasks('/project');
|
||||
const elapsed = Date.now() - startTime;
|
||||
|
||||
// Then: Both tasks started before either completed (concurrent execution)
|
||||
expect(executionOrder[0]).toBe('start:Task: slow-1');
|
||||
expect(executionOrder[1]).toBe('start:Task: slow-2');
|
||||
// Elapsed time should be closer to 50ms than 100ms (allowing margin for CI)
|
||||
expect(elapsed).toBeLessThan(150);
|
||||
});
|
||||
|
||||
it('should count partial failures correctly in a batch', async () => {
|
||||
// Given: 3 tasks, 1 fails, 2 succeed
|
||||
const task1 = createTask('pass-1');
|
||||
const task2 = createTask('fail-1');
|
||||
const task3 = createTask('pass-2');
|
||||
|
||||
let callIndex = 0;
|
||||
mockExecutePiece.mockImplementation(() => {
|
||||
callIndex++;
|
||||
// Second call fails
|
||||
return Promise.resolve({ success: callIndex !== 2 });
|
||||
});
|
||||
|
||||
mockGetNextTasks
|
||||
.mockReturnValueOnce([task1, task2, task3])
|
||||
.mockReturnValueOnce([]);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then: Correct success/fail counts
|
||||
expect(mockStatus).toHaveBeenCalledWith('Total', '3');
|
||||
expect(mockStatus).toHaveBeenCalledWith('Success', '2', undefined);
|
||||
expect(mockStatus).toHaveBeenCalledWith('Failed', '1', 'red');
|
||||
});
|
||||
|
||||
it('should pass abortSignal and quiet=true to executePiece in parallel mode', async () => {
|
||||
// Given: One task in parallel mode
|
||||
const task1 = createTask('parallel-task');
|
||||
|
||||
mockExecutePiece.mockResolvedValue({ success: true });
|
||||
|
||||
mockGetNextTasks
|
||||
.mockReturnValueOnce([task1])
|
||||
.mockReturnValueOnce([]);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then: executePiece received abortSignal and quiet options
|
||||
expect(mockExecutePiece).toHaveBeenCalledTimes(1);
|
||||
const callArgs = mockExecutePiece.mock.calls[0];
|
||||
const pieceOptions = callArgs?.[3]; // 4th argument is options
|
||||
expect(pieceOptions).toHaveProperty('abortSignal');
|
||||
expect(pieceOptions?.abortSignal).toBeInstanceOf(AbortSignal);
|
||||
expect(pieceOptions).toHaveProperty('quiet', true);
|
||||
});
|
||||
|
||||
it('should not pass abortSignal or quiet in sequential mode', async () => {
|
||||
// Given: Sequential mode
|
||||
mockLoadGlobalConfig.mockReturnValue({
|
||||
language: 'en',
|
||||
defaultPiece: 'default',
|
||||
logLevel: 'info',
|
||||
concurrency: 1,
|
||||
});
|
||||
|
||||
const task1 = createTask('sequential-task');
|
||||
mockExecutePiece.mockResolvedValue({ success: true });
|
||||
mockLoadPieceByIdentifier.mockReturnValue(fakePieceConfig as never);
|
||||
|
||||
mockGetNextTasks.mockReturnValueOnce([task1]);
|
||||
mockGetNextTask.mockReturnValueOnce(null);
|
||||
|
||||
// When
|
||||
await runAllTasks('/project');
|
||||
|
||||
// Then: executePiece should not have abortSignal or quiet
|
||||
expect(mockExecutePiece).toHaveBeenCalledTimes(1);
|
||||
const callArgs = mockExecutePiece.mock.calls[0];
|
||||
const pieceOptions = callArgs?.[3];
|
||||
expect(pieceOptions?.abortSignal).toBeUndefined();
|
||||
expect(pieceOptions?.quiet).toBeFalsy();
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -144,6 +144,52 @@ describe('TaskRunner', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('getNextTasks', () => {
|
||||
it('should return empty array when no tasks', () => {
|
||||
const tasks = runner.getNextTasks(3);
|
||||
expect(tasks).toEqual([]);
|
||||
});
|
||||
|
||||
it('should return all tasks when count exceeds available tasks', () => {
|
||||
const tasksDir = join(testDir, '.takt', 'tasks');
|
||||
mkdirSync(tasksDir, { recursive: true });
|
||||
writeFileSync(join(tasksDir, 'b-task.md'), 'B');
|
||||
writeFileSync(join(tasksDir, 'a-task.md'), 'A');
|
||||
|
||||
const tasks = runner.getNextTasks(5);
|
||||
expect(tasks).toHaveLength(2);
|
||||
expect(tasks[0]?.name).toBe('a-task');
|
||||
expect(tasks[1]?.name).toBe('b-task');
|
||||
});
|
||||
|
||||
it('should return only count tasks when more are available', () => {
|
||||
const tasksDir = join(testDir, '.takt', 'tasks');
|
||||
mkdirSync(tasksDir, { recursive: true });
|
||||
writeFileSync(join(tasksDir, 'c-task.md'), 'C');
|
||||
writeFileSync(join(tasksDir, 'b-task.md'), 'B');
|
||||
writeFileSync(join(tasksDir, 'a-task.md'), 'A');
|
||||
|
||||
const tasks = runner.getNextTasks(2);
|
||||
expect(tasks).toHaveLength(2);
|
||||
expect(tasks[0]?.name).toBe('a-task');
|
||||
expect(tasks[1]?.name).toBe('b-task');
|
||||
});
|
||||
|
||||
it('should return tasks in same sort order as getNextTask', () => {
|
||||
const tasksDir = join(testDir, '.takt', 'tasks');
|
||||
mkdirSync(tasksDir, { recursive: true });
|
||||
writeFileSync(join(tasksDir, '02-second.md'), 'Second');
|
||||
writeFileSync(join(tasksDir, '01-first.md'), 'First');
|
||||
writeFileSync(join(tasksDir, '03-third.md'), 'Third');
|
||||
|
||||
const nextTask = runner.getNextTask();
|
||||
const nextTasks = runner.getNextTasks(1);
|
||||
|
||||
expect(nextTasks).toHaveLength(1);
|
||||
expect(nextTasks[0]?.name).toBe(nextTask?.name);
|
||||
});
|
||||
});
|
||||
|
||||
describe('completeTask', () => {
|
||||
it('should move task to completed directory', () => {
|
||||
const tasksDir = join(testDir, '.takt', 'tasks');
|
||||
|
||||
@ -67,6 +67,8 @@ export interface GlobalConfig {
|
||||
preventSleep?: boolean;
|
||||
/** Enable notification sounds (default: true when undefined) */
|
||||
notificationSound?: boolean;
|
||||
/** Number of tasks to run concurrently in takt run (default: 1 = sequential) */
|
||||
concurrency: number;
|
||||
}
|
||||
|
||||
/** Project-level configuration */
|
||||
|
||||
@ -318,6 +318,8 @@ export const GlobalConfigSchema = z.object({
|
||||
prevent_sleep: z.boolean().optional(),
|
||||
/** Enable notification sounds (default: true when undefined) */
|
||||
notification_sound: z.boolean().optional(),
|
||||
/** Number of tasks to run concurrently in takt run (default: 1 = sequential, max: 10) */
|
||||
concurrency: z.number().int().min(1).max(10).optional().default(1),
|
||||
});
|
||||
|
||||
/** Project config schema */
|
||||
|
||||
75
src/features/tasks/execute/parallelExecution.ts
Normal file
75
src/features/tasks/execute/parallelExecution.ts
Normal file
@ -0,0 +1,75 @@
|
||||
/**
|
||||
* Parallel task execution strategy.
|
||||
*
|
||||
* Runs tasks in batches of up to `concurrency` tasks at a time.
|
||||
* Uses a single AbortController shared across all tasks in all batches.
|
||||
*/
|
||||
|
||||
import type { TaskRunner, TaskInfo } from '../../../infra/task/index.js';
|
||||
import { info, blankLine } from '../../../shared/ui/index.js';
|
||||
import { executeAndCompleteTask } from './taskExecution.js';
|
||||
import { installSigIntHandler } from './sigintHandler.js';
|
||||
import type { TaskExecutionOptions } from './types.js';
|
||||
|
||||
interface BatchResult {
|
||||
success: number;
|
||||
fail: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run tasks in parallel batches.
|
||||
*
|
||||
* @returns Aggregated success/fail counts across all batches
|
||||
*/
|
||||
export async function runParallel(
|
||||
taskRunner: TaskRunner,
|
||||
initialTasks: TaskInfo[],
|
||||
concurrency: number,
|
||||
cwd: string,
|
||||
pieceName: string,
|
||||
options?: TaskExecutionOptions,
|
||||
): Promise<BatchResult> {
|
||||
const abortController = new AbortController();
|
||||
const { cleanup } = installSigIntHandler(() => abortController.abort());
|
||||
|
||||
let successCount = 0;
|
||||
let failCount = 0;
|
||||
|
||||
try {
|
||||
let batch = initialTasks;
|
||||
while (batch.length > 0) {
|
||||
blankLine();
|
||||
info(`=== Running batch of ${batch.length} task(s) ===`);
|
||||
for (const task of batch) {
|
||||
info(` - ${task.name}`);
|
||||
}
|
||||
blankLine();
|
||||
|
||||
const results = await Promise.all(
|
||||
batch.map((task) =>
|
||||
executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, {
|
||||
abortSignal: abortController.signal,
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
for (const taskSuccess of results) {
|
||||
if (taskSuccess) {
|
||||
successCount++;
|
||||
} else {
|
||||
failCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (abortController.signal.aborted) {
|
||||
break;
|
||||
}
|
||||
|
||||
batch = taskRunner.getNextTasks(concurrency);
|
||||
}
|
||||
} finally {
|
||||
cleanup();
|
||||
}
|
||||
|
||||
return { success: successCount, fail: failCount };
|
||||
}
|
||||
@ -57,8 +57,8 @@ import {
|
||||
} from '../../../shared/utils/index.js';
|
||||
import type { PromptLogRecord } from '../../../shared/utils/index.js';
|
||||
import { selectOption, promptInput } from '../../../shared/prompt/index.js';
|
||||
import { EXIT_SIGINT } from '../../../shared/exitCodes.js';
|
||||
import { getLabel } from '../../../shared/i18n/index.js';
|
||||
import { installSigIntHandler } from './sigintHandler.js';
|
||||
|
||||
const log = createLogger('piece');
|
||||
|
||||
@ -322,8 +322,9 @@ export async function executePiece(
|
||||
const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name);
|
||||
const totalMovements = pieceConfig.movements.length;
|
||||
|
||||
// Use quiet mode from CLI (already resolved CLI flag + config in preAction)
|
||||
displayRef.current = new StreamDisplay(step.personaDisplayName, isQuietMode(), {
|
||||
// Use quiet mode: forced quiet in parallel execution, or CLI/config setting
|
||||
const quiet = options.quiet === true || isQuietMode();
|
||||
displayRef.current = new StreamDisplay(step.personaDisplayName, quiet, {
|
||||
iteration,
|
||||
maxIterations: pieceConfig.maxIterations,
|
||||
movementIndex: movementIndex >= 0 ? movementIndex : 0,
|
||||
@ -506,23 +507,25 @@ export async function executePiece(
|
||||
throw err;
|
||||
};
|
||||
|
||||
// SIGINT handler: 1st Ctrl+C = graceful abort, 2nd = force exit
|
||||
let sigintCount = 0;
|
||||
const onSigInt = () => {
|
||||
sigintCount++;
|
||||
if (sigintCount === 1) {
|
||||
blankLine();
|
||||
warn(getLabel('piece.sigintGraceful'));
|
||||
process.on('uncaughtException', onEpipe);
|
||||
interruptAllQueries();
|
||||
engine.abort();
|
||||
} else {
|
||||
blankLine();
|
||||
error(getLabel('piece.sigintForce'));
|
||||
process.exit(EXIT_SIGINT);
|
||||
}
|
||||
const abortEngine = () => {
|
||||
process.on('uncaughtException', onEpipe);
|
||||
interruptAllQueries();
|
||||
engine.abort();
|
||||
};
|
||||
process.on('SIGINT', onSigInt);
|
||||
|
||||
// SIGINT handling: when abortSignal is provided (parallel mode), delegate to caller
|
||||
const useExternalAbort = Boolean(options.abortSignal);
|
||||
|
||||
let onAbortSignal: (() => void) | undefined;
|
||||
let sigintCleanup: (() => void) | undefined;
|
||||
|
||||
if (useExternalAbort) {
|
||||
onAbortSignal = abortEngine;
|
||||
options.abortSignal!.addEventListener('abort', onAbortSignal, { once: true });
|
||||
} else {
|
||||
const handler = installSigIntHandler(abortEngine);
|
||||
sigintCleanup = handler.cleanup;
|
||||
}
|
||||
|
||||
try {
|
||||
const finalState = await engine.run();
|
||||
@ -532,7 +535,10 @@ export async function executePiece(
|
||||
reason: abortReason,
|
||||
};
|
||||
} finally {
|
||||
process.removeListener('SIGINT', onSigInt);
|
||||
sigintCleanup?.();
|
||||
if (onAbortSignal && options.abortSignal) {
|
||||
options.abortSignal.removeEventListener('abort', onAbortSignal);
|
||||
}
|
||||
process.removeListener('uncaughtException', onEpipe);
|
||||
}
|
||||
}
|
||||
|
||||
72
src/features/tasks/execute/resolveTask.ts
Normal file
72
src/features/tasks/execute/resolveTask.ts
Normal file
@ -0,0 +1,72 @@
|
||||
/**
|
||||
* Resolve execution directory and piece from task data.
|
||||
*/
|
||||
|
||||
import { loadGlobalConfig } from '../../../infra/config/index.js';
|
||||
import { type TaskInfo, createSharedClone, summarizeTaskName, getCurrentBranch } from '../../../infra/task/index.js';
|
||||
import { info } from '../../../shared/ui/index.js';
|
||||
|
||||
export interface ResolvedTaskExecution {
|
||||
execCwd: string;
|
||||
execPiece: string;
|
||||
isWorktree: boolean;
|
||||
branch?: string;
|
||||
baseBranch?: string;
|
||||
startMovement?: string;
|
||||
retryNote?: string;
|
||||
autoPr?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve execution directory and piece from task data.
|
||||
* If the task has worktree settings, create a shared clone and use it as cwd.
|
||||
* Task name is summarized to English by AI for use in branch/clone names.
|
||||
*/
|
||||
export async function resolveTaskExecution(
|
||||
task: TaskInfo,
|
||||
defaultCwd: string,
|
||||
defaultPiece: string,
|
||||
): Promise<ResolvedTaskExecution> {
|
||||
const data = task.data;
|
||||
|
||||
if (!data) {
|
||||
return { execCwd: defaultCwd, execPiece: defaultPiece, isWorktree: false };
|
||||
}
|
||||
|
||||
let execCwd = defaultCwd;
|
||||
let isWorktree = false;
|
||||
let branch: string | undefined;
|
||||
let baseBranch: string | undefined;
|
||||
|
||||
if (data.worktree) {
|
||||
baseBranch = getCurrentBranch(defaultCwd);
|
||||
info('Generating branch name...');
|
||||
const taskSlug = await summarizeTaskName(task.content, { cwd: defaultCwd });
|
||||
|
||||
info('Creating clone...');
|
||||
const result = createSharedClone(defaultCwd, {
|
||||
worktree: data.worktree,
|
||||
branch: data.branch,
|
||||
taskSlug,
|
||||
issueNumber: data.issue,
|
||||
});
|
||||
execCwd = result.path;
|
||||
branch = result.branch;
|
||||
isWorktree = true;
|
||||
info(`Clone created: ${result.path} (branch: ${result.branch})`);
|
||||
}
|
||||
|
||||
const execPiece = data.piece || defaultPiece;
|
||||
const startMovement = data.start_movement;
|
||||
const retryNote = data.retry_note;
|
||||
|
||||
let autoPr: boolean | undefined;
|
||||
if (data.auto_pr !== undefined) {
|
||||
autoPr = data.auto_pr;
|
||||
} else {
|
||||
const globalConfig = loadGlobalConfig();
|
||||
autoPr = globalConfig.autoPr;
|
||||
}
|
||||
|
||||
return { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr };
|
||||
}
|
||||
32
src/features/tasks/execute/sigintHandler.ts
Normal file
32
src/features/tasks/execute/sigintHandler.ts
Normal file
@ -0,0 +1,32 @@
|
||||
/**
|
||||
* Shared SIGINT handler for graceful/force shutdown pattern.
|
||||
*
|
||||
* 1st Ctrl+C = graceful abort via onAbort callback
|
||||
* 2nd Ctrl+C = force exit
|
||||
*/
|
||||
|
||||
import { blankLine, warn, error } from '../../../shared/ui/index.js';
|
||||
import { EXIT_SIGINT } from '../../../shared/exitCodes.js';
|
||||
import { getLabel } from '../../../shared/i18n/index.js';
|
||||
|
||||
interface SigIntHandler {
|
||||
cleanup: () => void;
|
||||
}
|
||||
|
||||
export function installSigIntHandler(onAbort: () => void): SigIntHandler {
|
||||
let sigintCount = 0;
|
||||
const handler = () => {
|
||||
sigintCount++;
|
||||
if (sigintCount === 1) {
|
||||
blankLine();
|
||||
warn(getLabel('piece.sigintGraceful'));
|
||||
onAbort();
|
||||
} else {
|
||||
blankLine();
|
||||
error(getLabel('piece.sigintForce'));
|
||||
process.exit(EXIT_SIGINT);
|
||||
}
|
||||
};
|
||||
process.on('SIGINT', handler);
|
||||
return { cleanup: () => process.removeListener('SIGINT', handler) };
|
||||
}
|
||||
@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
import { loadPieceByIdentifier, isPiecePath, loadGlobalConfig } from '../../../infra/config/index.js';
|
||||
import { TaskRunner, type TaskInfo, createSharedClone, autoCommitAndPush, summarizeTaskName, getCurrentBranch } from '../../../infra/task/index.js';
|
||||
import { TaskRunner, type TaskInfo, autoCommitAndPush } from '../../../infra/task/index.js';
|
||||
import {
|
||||
header,
|
||||
info,
|
||||
@ -17,6 +17,8 @@ import { executePiece } from './pieceExecution.js';
|
||||
import { DEFAULT_PIECE_NAME } from '../../../shared/constants.js';
|
||||
import type { TaskExecutionOptions, ExecuteTaskOptions } from './types.js';
|
||||
import { createPullRequest, buildPrBody, pushBranch } from '../../../infra/github/index.js';
|
||||
import { runParallel } from './parallelExecution.js';
|
||||
import { resolveTaskExecution } from './resolveTask.js';
|
||||
|
||||
export type { TaskExecutionOptions, ExecuteTaskOptions };
|
||||
|
||||
@ -26,7 +28,7 @@ const log = createLogger('task');
|
||||
* Execute a single task with piece.
|
||||
*/
|
||||
export async function executeTask(options: ExecuteTaskOptions): Promise<boolean> {
|
||||
const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote } = options;
|
||||
const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, quiet } = options;
|
||||
const pieceConfig = loadPieceByIdentifier(pieceIdentifier, projectCwd);
|
||||
|
||||
if (!pieceConfig) {
|
||||
@ -55,6 +57,8 @@ export async function executeTask(options: ExecuteTaskOptions): Promise<boolean>
|
||||
interactiveMetadata,
|
||||
startMovement,
|
||||
retryNote,
|
||||
abortSignal,
|
||||
quiet,
|
||||
});
|
||||
return result.success;
|
||||
}
|
||||
@ -73,6 +77,7 @@ export async function executeAndCompleteTask(
|
||||
cwd: string,
|
||||
pieceName: string,
|
||||
options?: TaskExecutionOptions,
|
||||
parallelOptions?: { abortSignal: AbortSignal },
|
||||
): Promise<boolean> {
|
||||
const startedAt = new Date().toISOString();
|
||||
const executionLog: string[] = [];
|
||||
@ -89,6 +94,8 @@ export async function executeAndCompleteTask(
|
||||
agentOverrides: options,
|
||||
startMovement,
|
||||
retryNote,
|
||||
abortSignal: parallelOptions?.abortSignal,
|
||||
quiet: parallelOptions !== undefined,
|
||||
});
|
||||
const completedAt = new Date().toISOString();
|
||||
|
||||
@ -161,32 +168,19 @@ export async function executeAndCompleteTask(
|
||||
}
|
||||
|
||||
/**
|
||||
* Run all pending tasks from .takt/tasks/
|
||||
*
|
||||
* タスクを動的に取得する。各タスク実行前に次のタスクを取得するため、
|
||||
* 実行中にタスクファイルが追加・削除されても反映される。
|
||||
* Run tasks sequentially, fetching one at a time.
|
||||
*/
|
||||
export async function runAllTasks(
|
||||
async function runSequential(
|
||||
taskRunner: TaskRunner,
|
||||
initialTask: TaskInfo,
|
||||
cwd: string,
|
||||
pieceName: string = DEFAULT_PIECE_NAME,
|
||||
pieceName: string,
|
||||
options?: TaskExecutionOptions,
|
||||
): Promise<void> {
|
||||
const taskRunner = new TaskRunner(cwd);
|
||||
|
||||
// 最初のタスクを取得
|
||||
let task = taskRunner.getNextTask();
|
||||
|
||||
if (!task) {
|
||||
info('No pending tasks in .takt/tasks/');
|
||||
info('Create task files as .takt/tasks/*.yaml or use takt add');
|
||||
return;
|
||||
}
|
||||
|
||||
header('Running tasks');
|
||||
|
||||
): Promise<{ success: number; fail: number }> {
|
||||
let successCount = 0;
|
||||
let failCount = 0;
|
||||
|
||||
let task: TaskInfo | undefined = initialTask;
|
||||
while (task) {
|
||||
blankLine();
|
||||
info(`=== Task: ${task.name} ===`);
|
||||
@ -200,79 +194,54 @@ export async function runAllTasks(
|
||||
failCount++;
|
||||
}
|
||||
|
||||
// 次のタスクを動的に取得(新しく追加されたタスクも含む)
|
||||
task = taskRunner.getNextTask();
|
||||
task = taskRunner.getNextTask() ?? undefined;
|
||||
}
|
||||
|
||||
const totalCount = successCount + failCount;
|
||||
blankLine();
|
||||
header('Tasks Summary');
|
||||
status('Total', String(totalCount));
|
||||
status('Success', String(successCount), successCount === totalCount ? 'green' : undefined);
|
||||
if (failCount > 0) {
|
||||
status('Failed', String(failCount), 'red');
|
||||
}
|
||||
return { success: successCount, fail: failCount };
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve execution directory and piece from task data.
|
||||
* If the task has worktree settings, create a shared clone and use it as cwd.
|
||||
* Task name is summarized to English by AI for use in branch/clone names.
|
||||
* Run all pending tasks from .takt/tasks/
|
||||
*
|
||||
* concurrency=1: 逐次実行(従来動作)
|
||||
* concurrency=N (N>1): 最大N個のタスクをバッチ並列実行
|
||||
*/
|
||||
export async function resolveTaskExecution(
|
||||
task: TaskInfo,
|
||||
defaultCwd: string,
|
||||
defaultPiece: string
|
||||
): Promise<{ execCwd: string; execPiece: string; isWorktree: boolean; branch?: string; baseBranch?: string; startMovement?: string; retryNote?: string; autoPr?: boolean }> {
|
||||
const data = task.data;
|
||||
export async function runAllTasks(
|
||||
cwd: string,
|
||||
pieceName: string = DEFAULT_PIECE_NAME,
|
||||
options?: TaskExecutionOptions,
|
||||
): Promise<void> {
|
||||
const taskRunner = new TaskRunner(cwd);
|
||||
const globalConfig = loadGlobalConfig();
|
||||
const concurrency = globalConfig.concurrency;
|
||||
|
||||
// No structured data: use defaults
|
||||
if (!data) {
|
||||
return { execCwd: defaultCwd, execPiece: defaultPiece, isWorktree: false };
|
||||
const initialTasks = taskRunner.getNextTasks(concurrency);
|
||||
|
||||
if (initialTasks.length === 0) {
|
||||
info('No pending tasks in .takt/tasks/');
|
||||
info('Create task files as .takt/tasks/*.yaml or use takt add');
|
||||
return;
|
||||
}
|
||||
|
||||
let execCwd = defaultCwd;
|
||||
let isWorktree = false;
|
||||
let branch: string | undefined;
|
||||
let baseBranch: string | undefined;
|
||||
|
||||
// Handle worktree (now creates a shared clone)
|
||||
if (data.worktree) {
|
||||
baseBranch = getCurrentBranch(defaultCwd);
|
||||
// Summarize task content to English slug using AI
|
||||
info('Generating branch name...');
|
||||
const taskSlug = await summarizeTaskName(task.content, { cwd: defaultCwd });
|
||||
|
||||
info('Creating clone...');
|
||||
const result = createSharedClone(defaultCwd, {
|
||||
worktree: data.worktree,
|
||||
branch: data.branch,
|
||||
taskSlug,
|
||||
issueNumber: data.issue,
|
||||
});
|
||||
execCwd = result.path;
|
||||
branch = result.branch;
|
||||
isWorktree = true;
|
||||
info(`Clone created: ${result.path} (branch: ${result.branch})`);
|
||||
header('Running tasks');
|
||||
if (concurrency > 1) {
|
||||
info(`Concurrency: ${concurrency}`);
|
||||
}
|
||||
|
||||
// Handle piece override
|
||||
const execPiece = data.piece || defaultPiece;
|
||||
// initialTasks is guaranteed non-empty at this point (early return above)
|
||||
const result = concurrency <= 1
|
||||
? await runSequential(taskRunner, initialTasks[0]!, cwd, pieceName, options)
|
||||
: await runParallel(taskRunner, initialTasks, concurrency, cwd, pieceName, options);
|
||||
|
||||
// Handle start_movement override
|
||||
const startMovement = data.start_movement;
|
||||
|
||||
// Handle retry_note
|
||||
const retryNote = data.retry_note;
|
||||
|
||||
// Handle auto_pr (task YAML > global config)
|
||||
let autoPr: boolean | undefined;
|
||||
if (data.auto_pr !== undefined) {
|
||||
autoPr = data.auto_pr;
|
||||
} else {
|
||||
const globalConfig = loadGlobalConfig();
|
||||
autoPr = globalConfig.autoPr;
|
||||
const totalCount = result.success + result.fail;
|
||||
blankLine();
|
||||
header('Tasks Summary');
|
||||
status('Total', String(totalCount));
|
||||
status('Success', String(result.success), result.success === totalCount ? 'green' : undefined);
|
||||
if (result.fail > 0) {
|
||||
status('Failed', String(result.fail), 'red');
|
||||
}
|
||||
|
||||
return { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr };
|
||||
}
|
||||
|
||||
// Re-export for backward compatibility with existing consumers
|
||||
export { resolveTaskExecution } from './resolveTask.js';
|
||||
|
||||
@ -38,6 +38,10 @@ export interface PieceExecutionOptions {
|
||||
startMovement?: string;
|
||||
/** Retry note explaining why task is being retried */
|
||||
retryNote?: string;
|
||||
/** External abort signal for parallel execution — when provided, SIGINT handling is delegated to caller */
|
||||
abortSignal?: AbortSignal;
|
||||
/** Force quiet mode for streaming output (used in parallel execution to prevent interleaving) */
|
||||
quiet?: boolean;
|
||||
}
|
||||
|
||||
export interface TaskExecutionOptions {
|
||||
@ -64,6 +68,10 @@ export interface ExecuteTaskOptions {
|
||||
startMovement?: string;
|
||||
/** Retry note explaining why task is being retried */
|
||||
retryNote?: string;
|
||||
/** External abort signal for parallel execution — when provided, SIGINT handling is delegated to caller */
|
||||
abortSignal?: AbortSignal;
|
||||
/** Force quiet mode for streaming output (used in parallel execution to prevent interleaving) */
|
||||
quiet?: boolean;
|
||||
}
|
||||
|
||||
export interface PipelineExecutionOptions {
|
||||
|
||||
@ -35,6 +35,7 @@ function createDefaultGlobalConfig(): GlobalConfig {
|
||||
logLevel: 'info',
|
||||
provider: 'claude',
|
||||
enableBuiltinPieces: true,
|
||||
concurrency: 1,
|
||||
};
|
||||
}
|
||||
|
||||
@ -106,6 +107,7 @@ export class GlobalConfigManager {
|
||||
branchNameStrategy: parsed.branch_name_strategy,
|
||||
preventSleep: parsed.prevent_sleep,
|
||||
notificationSound: parsed.notification_sound,
|
||||
concurrency: parsed.concurrency,
|
||||
};
|
||||
validateProviderModelCompatibility(config.provider, config.model);
|
||||
this.cachedConfig = config;
|
||||
@ -175,6 +177,9 @@ export class GlobalConfigManager {
|
||||
if (config.notificationSound !== undefined) {
|
||||
raw.notification_sound = config.notificationSound;
|
||||
}
|
||||
if (config.concurrency !== undefined && config.concurrency > 1) {
|
||||
raw.concurrency = config.concurrency;
|
||||
}
|
||||
writeFileSync(configPath, stringifyYaml(raw), 'utf-8');
|
||||
this.invalidateCache();
|
||||
}
|
||||
|
||||
@ -105,6 +105,14 @@ export class TaskRunner {
|
||||
return tasks[0] ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 次に実行すべきタスクを指定数分取得
|
||||
*/
|
||||
getNextTasks(count: number): TaskInfo[] {
|
||||
const tasks = this.listTasks();
|
||||
return tasks.slice(0, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* タスクを完了としてマーク
|
||||
*
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user