* dist-tag 検証をリトライ付きに変更(npm レジストリの結果整合性対策) * takt run 実行時に蓋閉じスリープを抑制 * takt: github-issue-237-fix-ctrl-c-provider-graceful-timeout-force
221 lines
6.7 KiB
TypeScript
221 lines
6.7 KiB
TypeScript
/**
|
|
* Worker pool task execution strategy.
|
|
*
|
|
* Runs tasks using a fixed-size worker pool. Each worker picks up the next
|
|
* available task as soon as it finishes the current one, maximizing slot
|
|
* utilization. Works for both sequential (concurrency=1) and parallel
|
|
* (concurrency>1) execution through the same code path.
|
|
*
|
|
* Polls for newly added tasks at a configurable interval so that tasks
|
|
* added to .takt/tasks.yaml during execution are picked up without waiting
|
|
* for an active task to complete.
|
|
*/
|
|
|
|
import type { TaskRunner, TaskInfo } from '../../../infra/task/index.js';
|
|
import { info, blankLine } from '../../../shared/ui/index.js';
|
|
import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js';
|
|
import { EXIT_SIGINT } from '../../../shared/exitCodes.js';
|
|
import { createLogger } from '../../../shared/utils/index.js';
|
|
import { executeAndCompleteTask } from './taskExecution.js';
|
|
import { ShutdownManager } from './shutdownManager.js';
|
|
import type { TaskExecutionOptions } from './types.js';
|
|
|
|
const log = createLogger('worker-pool');
|
|
|
|
export interface WorkerPoolResult {
|
|
success: number;
|
|
fail: number;
|
|
}
|
|
|
|
type RaceResult =
|
|
| { type: 'completion'; promise: Promise<boolean>; result: boolean }
|
|
| { type: 'poll' };
|
|
|
|
interface PollTimer {
|
|
promise: Promise<RaceResult>;
|
|
cancel: () => void;
|
|
}
|
|
|
|
function createPollTimer(intervalMs: number, signal: AbortSignal): PollTimer {
|
|
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
|
let onAbort: (() => void) | undefined;
|
|
|
|
const promise = new Promise<RaceResult>((resolve) => {
|
|
if (signal.aborted) {
|
|
resolve({ type: 'poll' });
|
|
return;
|
|
}
|
|
|
|
onAbort = () => {
|
|
if (timeoutId !== undefined) clearTimeout(timeoutId);
|
|
resolve({ type: 'poll' });
|
|
};
|
|
|
|
timeoutId = setTimeout(() => {
|
|
signal.removeEventListener('abort', onAbort!);
|
|
onAbort = undefined;
|
|
resolve({ type: 'poll' });
|
|
}, intervalMs);
|
|
|
|
signal.addEventListener('abort', onAbort, { once: true });
|
|
});
|
|
|
|
return {
|
|
promise,
|
|
cancel: () => {
|
|
if (timeoutId !== undefined) {
|
|
clearTimeout(timeoutId);
|
|
timeoutId = undefined;
|
|
}
|
|
if (onAbort) {
|
|
signal.removeEventListener('abort', onAbort);
|
|
onAbort = undefined;
|
|
}
|
|
},
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Run tasks using a worker pool with the given concurrency.
|
|
*
|
|
* Algorithm:
|
|
* 1. Create a shared AbortController
|
|
* 2. Maintain a queue of pending tasks and a set of active promises
|
|
* 3. Fill available slots from the queue
|
|
* 4. Wait for any active task to complete OR a poll timer to fire (Promise.race)
|
|
* 5. On task completion: record result
|
|
* 6. On poll tick or completion: claim new tasks and fill freed slots
|
|
* 7. Repeat until queue is empty and all active tasks complete
|
|
*/
|
|
export async function runWithWorkerPool(
|
|
taskRunner: TaskRunner,
|
|
initialTasks: TaskInfo[],
|
|
concurrency: number,
|
|
cwd: string,
|
|
pieceName: string,
|
|
options: TaskExecutionOptions | undefined,
|
|
pollIntervalMs: number,
|
|
): Promise<WorkerPoolResult> {
|
|
const abortController = new AbortController();
|
|
const shutdownManager = new ShutdownManager({
|
|
callbacks: {
|
|
onGraceful: () => abortController.abort(),
|
|
onForceKill: () => process.exit(EXIT_SIGINT),
|
|
},
|
|
});
|
|
shutdownManager.install();
|
|
const selfSigintOnce = process.env.TAKT_E2E_SELF_SIGINT_ONCE === '1';
|
|
const selfSigintTwice = process.env.TAKT_E2E_SELF_SIGINT_TWICE === '1';
|
|
let selfSigintInjected = false;
|
|
|
|
let successCount = 0;
|
|
let failCount = 0;
|
|
|
|
const queue = [...initialTasks];
|
|
const active = new Map<Promise<boolean>, TaskInfo>();
|
|
const colorCounter = { value: 0 };
|
|
|
|
try {
|
|
while (queue.length > 0 || active.size > 0) {
|
|
if (!abortController.signal.aborted) {
|
|
fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController, colorCounter);
|
|
if ((selfSigintOnce || selfSigintTwice) && !selfSigintInjected && active.size > 0) {
|
|
selfSigintInjected = true;
|
|
process.emit('SIGINT');
|
|
if (selfSigintTwice) {
|
|
process.emit('SIGINT');
|
|
}
|
|
}
|
|
}
|
|
|
|
if (active.size === 0) {
|
|
break;
|
|
}
|
|
|
|
const completionPromises: Promise<RaceResult>[] = [...active.keys()].map((p) =>
|
|
p.then(
|
|
(result): RaceResult => ({ type: 'completion', promise: p, result }),
|
|
(): RaceResult => ({ type: 'completion', promise: p, result: false }),
|
|
),
|
|
);
|
|
|
|
let settled: RaceResult;
|
|
if (abortController.signal.aborted) {
|
|
// Graceful shutdown: stop scheduling new work but wait for in-flight tasks to settle.
|
|
settled = await Promise.race(completionPromises);
|
|
} else {
|
|
const pollTimer = createPollTimer(pollIntervalMs, abortController.signal);
|
|
try {
|
|
settled = await Promise.race([...completionPromises, pollTimer.promise]);
|
|
} finally {
|
|
pollTimer.cancel();
|
|
}
|
|
}
|
|
|
|
if (settled.type === 'completion') {
|
|
const task = active.get(settled.promise);
|
|
active.delete(settled.promise);
|
|
|
|
if (task) {
|
|
if (settled.result) {
|
|
successCount++;
|
|
} else {
|
|
failCount++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!abortController.signal.aborted) {
|
|
const freeSlots = concurrency - active.size;
|
|
if (freeSlots > 0) {
|
|
const newTasks = taskRunner.claimNextTasks(freeSlots);
|
|
log.debug('poll_tick', { active: active.size, queued: queue.length, freeSlots });
|
|
if (newTasks.length > 0) {
|
|
log.debug('poll_new_tasks', { count: newTasks.length });
|
|
queue.push(...newTasks);
|
|
} else {
|
|
log.debug('no_new_tasks');
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
shutdownManager.cleanup();
|
|
}
|
|
|
|
return { success: successCount, fail: failCount };
|
|
}
|
|
|
|
function fillSlots(
|
|
queue: TaskInfo[],
|
|
active: Map<Promise<boolean>, TaskInfo>,
|
|
concurrency: number,
|
|
taskRunner: TaskRunner,
|
|
cwd: string,
|
|
pieceName: string,
|
|
options: TaskExecutionOptions | undefined,
|
|
abortController: AbortController,
|
|
colorCounter: { value: number },
|
|
): void {
|
|
while (active.size < concurrency && queue.length > 0) {
|
|
const task = queue.shift()!;
|
|
const isParallel = concurrency > 1;
|
|
const colorIndex = colorCounter.value++;
|
|
|
|
if (isParallel) {
|
|
const writer = new TaskPrefixWriter({ taskName: task.name, colorIndex });
|
|
writer.writeLine(`=== Task: ${task.name} ===`);
|
|
} else {
|
|
blankLine();
|
|
info(`=== Task: ${task.name} ===`);
|
|
}
|
|
|
|
const promise = executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, {
|
|
abortSignal: abortController.signal,
|
|
taskPrefix: isParallel ? task.name : undefined,
|
|
taskColorIndex: isParallel ? colorIndex : undefined,
|
|
});
|
|
active.set(promise, task);
|
|
}
|
|
}
|