refactor(task-store): replace file-based lock with in-memory guard
This commit is contained in:
parent
89cb3f8dbf
commit
251acf8e51
@ -1,4 +1,4 @@
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import { mkdirSync, writeFileSync, existsSync, rmSync, readFileSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import { parse as parseYaml, stringify as stringifyYaml } from 'yaml';
|
||||
@ -94,32 +94,6 @@ describe('TaskRunner (tasks.yaml)', () => {
|
||||
expect(recovered).toBe(0);
|
||||
});
|
||||
|
||||
it('should take over stale lock file with invalid pid', () => {
|
||||
mkdirSync(join(testDir, '.takt'), { recursive: true });
|
||||
writeFileSync(join(testDir, '.takt', 'tasks.yaml.lock'), 'invalid-pid', 'utf-8');
|
||||
|
||||
const task = runner.addTask('Task with stale lock');
|
||||
|
||||
expect(task.name).toContain('task-with-stale-lock');
|
||||
expect(existsSync(join(testDir, '.takt', 'tasks.yaml.lock'))).toBe(false);
|
||||
});
|
||||
|
||||
it('should timeout when lock file is held by a live process', () => {
|
||||
mkdirSync(join(testDir, '.takt'), { recursive: true });
|
||||
writeFileSync(join(testDir, '.takt', 'tasks.yaml.lock'), String(process.pid), 'utf-8');
|
||||
|
||||
const dateNowSpy = vi.spyOn(Date, 'now');
|
||||
dateNowSpy.mockReturnValueOnce(0);
|
||||
dateNowSpy.mockReturnValue(5_000);
|
||||
|
||||
try {
|
||||
expect(() => runner.listTasks()).toThrow('Failed to acquire tasks lock within 5000ms');
|
||||
} finally {
|
||||
dateNowSpy.mockRestore();
|
||||
rmSync(join(testDir, '.takt', 'tasks.yaml.lock'), { force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it('should recover from corrupted tasks.yaml and allow adding tasks again', () => {
|
||||
mkdirSync(join(testDir, '.takt'), { recursive: true });
|
||||
writeFileSync(join(testDir, '.takt', 'tasks.yaml'), 'tasks:\n - name: [broken', 'utf-8');
|
||||
|
||||
@ -5,28 +5,15 @@ import { TasksFileSchema, type TasksFileData } from './schema.js';
|
||||
import { createLogger } from '../../shared/utils/index.js';
|
||||
|
||||
const log = createLogger('task-store');
|
||||
const LOCK_WAIT_MS = 5_000;
|
||||
const LOCK_POLL_MS = 50;
|
||||
|
||||
function sleepSync(ms: number): void {
|
||||
const arr = new Int32Array(new SharedArrayBuffer(4));
|
||||
Atomics.wait(arr, 0, 0, ms);
|
||||
}
|
||||
|
||||
function fsErrorCode(err: unknown): string | undefined {
|
||||
return (err as NodeJS.ErrnoException).code;
|
||||
}
|
||||
|
||||
export class TaskStore {
|
||||
private readonly tasksFile: string;
|
||||
private readonly lockFile: string;
|
||||
private readonly taktDir: string;
|
||||
private lockOwned = false;
|
||||
private locked = false;
|
||||
|
||||
constructor(private readonly projectDir: string) {
|
||||
this.taktDir = path.join(projectDir, '.takt');
|
||||
this.tasksFile = path.join(this.taktDir, 'tasks.yaml');
|
||||
this.lockFile = path.join(this.taktDir, 'tasks.yaml.lock');
|
||||
}
|
||||
|
||||
getTasksFilePath(): string {
|
||||
@ -84,97 +71,14 @@ export class TaskStore {
|
||||
}
|
||||
|
||||
private withLock<T>(fn: () => T): T {
|
||||
this.acquireLock();
|
||||
if (this.locked) {
|
||||
throw new Error('TaskStore: reentrant lock detected');
|
||||
}
|
||||
this.locked = true;
|
||||
try {
|
||||
return fn();
|
||||
} finally {
|
||||
this.releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
private acquireLock(): void {
|
||||
this.ensureDirs();
|
||||
const start = Date.now();
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
fs.writeFileSync(this.lockFile, String(process.pid), { encoding: 'utf-8', flag: 'wx' });
|
||||
this.lockOwned = true;
|
||||
return;
|
||||
} catch (err) {
|
||||
if (fsErrorCode(err) !== 'EEXIST') {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.isStaleLock()) {
|
||||
this.removeStaleLock();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (Date.now() - start >= LOCK_WAIT_MS) {
|
||||
throw new Error(`Failed to acquire tasks lock within ${LOCK_WAIT_MS}ms`);
|
||||
}
|
||||
|
||||
sleepSync(LOCK_POLL_MS);
|
||||
}
|
||||
}
|
||||
|
||||
private isStaleLock(): boolean {
|
||||
let pidRaw: string;
|
||||
try {
|
||||
pidRaw = fs.readFileSync(this.lockFile, 'utf-8').trim();
|
||||
} catch (err) {
|
||||
const code = fsErrorCode(err);
|
||||
if (code === 'ENOENT' || code === 'EPERM') {
|
||||
return false;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
||||
const pid = Number.parseInt(pidRaw, 10);
|
||||
if (!Number.isInteger(pid) || pid <= 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return !this.isProcessAlive(pid);
|
||||
}
|
||||
|
||||
private removeStaleLock(): void {
|
||||
try {
|
||||
fs.unlinkSync(this.lockFile);
|
||||
} catch (err) {
|
||||
if (fsErrorCode(err) !== 'ENOENT') {
|
||||
log.debug('Failed to remove stale lock, retrying.', { lockFile: this.lockFile, error: String(err) });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private isProcessAlive(pid: number): boolean {
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch (err) {
|
||||
const code = fsErrorCode(err);
|
||||
if (code === 'ESRCH') {
|
||||
return false;
|
||||
}
|
||||
if (code === 'EPERM') {
|
||||
return true;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private releaseLock(): void {
|
||||
if (!this.lockOwned) return;
|
||||
this.lockOwned = false;
|
||||
|
||||
try {
|
||||
fs.unlinkSync(this.lockFile);
|
||||
} catch (err) {
|
||||
if (fsErrorCode(err) === 'ENOENT') return;
|
||||
log.debug('Failed to release tasks lock.', { lockFile: this.lockFile, error: String(err) });
|
||||
this.locked = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user