github-issue-200-arpeggio (#203)

* fix: stable release時にnext dist-tagを自動同期

* takt: github-issue-200-arpeggio
This commit is contained in:
nrs 2026-02-10 13:37:15 +09:00 committed by GitHub
parent d73643dcd9
commit 7e15691ba2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1802 additions and 7 deletions

View File

@ -52,12 +52,47 @@ jobs:
run: | run: |
VERSION="${{ needs.tag.outputs.tag }}" VERSION="${{ needs.tag.outputs.tag }}"
VERSION="${VERSION#v}" VERSION="${VERSION#v}"
echo "version=$VERSION" >> "$GITHUB_OUTPUT"
if echo "$VERSION" | grep -qE '(alpha|beta|rc|next)'; then if echo "$VERSION" | grep -qE '(alpha|beta|rc|next)'; then
echo "tag=next" >> "$GITHUB_OUTPUT" echo "tag=next" >> "$GITHUB_OUTPUT"
else else
echo "tag=latest" >> "$GITHUB_OUTPUT" echo "tag=latest" >> "$GITHUB_OUTPUT"
fi fi
- run: npm publish --tag ${{ steps.npm-tag.outputs.tag }} - name: Publish package
run: npm publish --tag ${{ steps.npm-tag.outputs.tag }}
env: env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
- name: Sync next tag on stable release
if: steps.npm-tag.outputs.tag == 'latest'
run: |
PACKAGE_NAME=$(node -p "require('./package.json').name")
VERSION="${{ steps.npm-tag.outputs.version }}"
for attempt in 1 2 3; do
if npm dist-tag add "${PACKAGE_NAME}@${VERSION}" next; then
exit 0
fi
if [ "$attempt" -eq 3 ]; then
echo "Failed to sync next tag after 3 attempts."
exit 1
fi
sleep $((attempt * 5))
done
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
- name: Verify dist-tags
run: |
PACKAGE_NAME=$(node -p "require('./package.json').name")
LATEST=$(npm view "${PACKAGE_NAME}" dist-tags.latest)
NEXT=$(npm view "${PACKAGE_NAME}" dist-tags.next || true)
echo "latest=${LATEST}"
echo "next=${NEXT}"
if [ "${{ steps.npm-tag.outputs.tag }}" = "latest" ] && [ "${LATEST}" != "${NEXT}" ]; then
echo "Expected next to match latest on stable release, but they differ."
exit 1
fi

View File

@ -0,0 +1,136 @@
/**
* Tests for CSV data source parsing and batch reading.
*/
import { describe, it, expect } from 'vitest';
import { parseCsv, CsvDataSource } from '../core/piece/arpeggio/csv-data-source.js';
import { writeFileSync, mkdirSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { randomUUID } from 'node:crypto';
describe('parseCsv', () => {
it('should parse simple CSV content', () => {
const csv = 'name,age\nAlice,30\nBob,25';
const result = parseCsv(csv);
expect(result).toEqual([
['name', 'age'],
['Alice', '30'],
['Bob', '25'],
]);
});
it('should handle quoted fields', () => {
const csv = 'name,description\nAlice,"Hello, World"\nBob,"Line1"';
const result = parseCsv(csv);
expect(result).toEqual([
['name', 'description'],
['Alice', 'Hello, World'],
['Bob', 'Line1'],
]);
});
it('should handle escaped quotes (double quotes)', () => {
const csv = 'name,value\nAlice,"He said ""hello"""\nBob,simple';
const result = parseCsv(csv);
expect(result).toEqual([
['name', 'value'],
['Alice', 'He said "hello"'],
['Bob', 'simple'],
]);
});
it('should handle CRLF line endings', () => {
const csv = 'name,age\r\nAlice,30\r\nBob,25';
const result = parseCsv(csv);
expect(result).toEqual([
['name', 'age'],
['Alice', '30'],
['Bob', '25'],
]);
});
it('should handle bare CR line endings', () => {
const csv = 'name,age\rAlice,30\rBob,25';
const result = parseCsv(csv);
expect(result).toEqual([
['name', 'age'],
['Alice', '30'],
['Bob', '25'],
]);
});
it('should handle empty fields', () => {
const csv = 'a,b,c\n1,,3\n,,';
const result = parseCsv(csv);
expect(result).toEqual([
['a', 'b', 'c'],
['1', '', '3'],
['', '', ''],
]);
});
it('should handle newlines within quoted fields', () => {
const csv = 'name,bio\nAlice,"Line1\nLine2"\nBob,simple';
const result = parseCsv(csv);
expect(result).toEqual([
['name', 'bio'],
['Alice', 'Line1\nLine2'],
['Bob', 'simple'],
]);
});
});
describe('CsvDataSource', () => {
function createTempCsv(content: string): string {
const dir = join(tmpdir(), `takt-csv-test-${randomUUID()}`);
mkdirSync(dir, { recursive: true });
const filePath = join(dir, 'test.csv');
writeFileSync(filePath, content, 'utf-8');
return filePath;
}
it('should read batches with batch_size 1', async () => {
const filePath = createTempCsv('name,age\nAlice,30\nBob,25\nCharlie,35');
const source = new CsvDataSource(filePath);
const batches = await source.readBatches(1);
expect(batches).toHaveLength(3);
expect(batches[0]!.rows).toEqual([{ name: 'Alice', age: '30' }]);
expect(batches[0]!.batchIndex).toBe(0);
expect(batches[0]!.totalBatches).toBe(3);
expect(batches[1]!.rows).toEqual([{ name: 'Bob', age: '25' }]);
expect(batches[2]!.rows).toEqual([{ name: 'Charlie', age: '35' }]);
});
it('should read batches with batch_size 2', async () => {
const filePath = createTempCsv('name,age\nAlice,30\nBob,25\nCharlie,35');
const source = new CsvDataSource(filePath);
const batches = await source.readBatches(2);
expect(batches).toHaveLength(2);
expect(batches[0]!.rows).toEqual([
{ name: 'Alice', age: '30' },
{ name: 'Bob', age: '25' },
]);
expect(batches[0]!.totalBatches).toBe(2);
expect(batches[1]!.rows).toEqual([
{ name: 'Charlie', age: '35' },
]);
});
it('should throw when CSV has no data rows', async () => {
const filePath = createTempCsv('name,age');
const source = new CsvDataSource(filePath);
await expect(source.readBatches(1)).rejects.toThrow('CSV file has no data rows');
});
it('should handle missing columns by returning empty string', async () => {
const filePath = createTempCsv('a,b,c\n1,2\n3');
const source = new CsvDataSource(filePath);
const batches = await source.readBatches(1);
expect(batches[0]!.rows).toEqual([{ a: '1', b: '2', c: '' }]);
expect(batches[1]!.rows).toEqual([{ a: '3', b: '', c: '' }]);
});
});

View File

@ -0,0 +1,50 @@
/**
* Tests for the arpeggio data source factory.
*
* Covers:
* - Built-in 'csv' source type returns CsvDataSource
* - Custom module: valid default export returns a data source
* - Custom module: non-function default export throws
* - Custom module: missing default export throws
*/
import { describe, it, expect } from 'vitest';
import { createDataSource } from '../core/piece/arpeggio/data-source-factory.js';
import { CsvDataSource } from '../core/piece/arpeggio/csv-data-source.js';
describe('createDataSource', () => {
it('should return a CsvDataSource for built-in "csv" type', async () => {
const source = await createDataSource('csv', '/path/to/data.csv');
expect(source).toBeInstanceOf(CsvDataSource);
});
it('should return a valid data source from a custom module with correct default export', async () => {
const tempModulePath = new URL(
'data:text/javascript,export default function(path) { return { readBatches: async () => [] }; }'
).href;
const source = await createDataSource(tempModulePath, '/some/path');
expect(source).toBeDefined();
expect(typeof source.readBatches).toBe('function');
});
it('should throw when custom module does not export a default function', async () => {
const tempModulePath = new URL(
'data:text/javascript,export default "not-a-function"'
).href;
await expect(createDataSource(tempModulePath, '/some/path')).rejects.toThrow(
/must export a default factory function/
);
});
it('should throw when custom module has no default export', async () => {
const tempModulePath = new URL(
'data:text/javascript,export const foo = 42'
).href;
await expect(createDataSource(tempModulePath, '/some/path')).rejects.toThrow(
/must export a default factory function/
);
});
});

View File

@ -0,0 +1,108 @@
/**
* Tests for arpeggio merge processing.
*/
import { describe, it, expect } from 'vitest';
import { buildMergeFn } from '../core/piece/arpeggio/merge.js';
import type { ArpeggioMergeMovementConfig } from '../core/piece/arpeggio/types.js';
import type { BatchResult } from '../core/piece/arpeggio/types.js';
function makeResult(batchIndex: number, content: string, success = true): BatchResult {
return { batchIndex, content, success };
}
function makeFailedResult(batchIndex: number, error: string): BatchResult {
return { batchIndex, content: '', success: false, error };
}
describe('buildMergeFn', () => {
describe('concat strategy', () => {
it('should concatenate results with default separator (newline)', async () => {
const config: ArpeggioMergeMovementConfig = { strategy: 'concat' };
const mergeFn = await buildMergeFn(config);
const results = [
makeResult(0, 'Result A'),
makeResult(1, 'Result B'),
makeResult(2, 'Result C'),
];
expect(mergeFn(results)).toBe('Result A\nResult B\nResult C');
});
it('should concatenate results with custom separator', async () => {
const config: ArpeggioMergeMovementConfig = { strategy: 'concat', separator: '\n---\n' };
const mergeFn = await buildMergeFn(config);
const results = [
makeResult(0, 'A'),
makeResult(1, 'B'),
];
expect(mergeFn(results)).toBe('A\n---\nB');
});
it('should sort results by batch index', async () => {
const config: ArpeggioMergeMovementConfig = { strategy: 'concat' };
const mergeFn = await buildMergeFn(config);
const results = [
makeResult(2, 'C'),
makeResult(0, 'A'),
makeResult(1, 'B'),
];
expect(mergeFn(results)).toBe('A\nB\nC');
});
it('should filter out failed results', async () => {
const config: ArpeggioMergeMovementConfig = { strategy: 'concat' };
const mergeFn = await buildMergeFn(config);
const results = [
makeResult(0, 'A'),
makeFailedResult(1, 'oops'),
makeResult(2, 'C'),
];
expect(mergeFn(results)).toBe('A\nC');
});
it('should return empty string when all results failed', async () => {
const config: ArpeggioMergeMovementConfig = { strategy: 'concat' };
const mergeFn = await buildMergeFn(config);
const results = [
makeFailedResult(0, 'error1'),
makeFailedResult(1, 'error2'),
];
expect(mergeFn(results)).toBe('');
});
});
describe('custom strategy with inline_js', () => {
it('should execute inline JS merge function', async () => {
const config: ArpeggioMergeMovementConfig = {
strategy: 'custom',
inlineJs: 'return results.filter(r => r.success).map(r => r.content.toUpperCase()).join(", ");',
};
const mergeFn = await buildMergeFn(config);
const results = [
makeResult(0, 'hello'),
makeResult(1, 'world'),
];
expect(mergeFn(results)).toBe('HELLO, WORLD');
});
it('should throw when inline JS returns non-string', async () => {
const config: ArpeggioMergeMovementConfig = {
strategy: 'custom',
inlineJs: 'return 42;',
};
const mergeFn = await buildMergeFn(config);
expect(() => mergeFn([makeResult(0, 'test')])).toThrow(
'Inline JS merge function must return a string, got number'
);
});
});
describe('custom strategy validation', () => {
it('should throw when custom strategy has neither inline_js nor file', async () => {
const config: ArpeggioMergeMovementConfig = { strategy: 'custom' };
await expect(buildMergeFn(config)).rejects.toThrow(
'Custom merge strategy requires either inline_js or file path'
);
});
});
});

View File

@ -0,0 +1,332 @@
/**
* Tests for Arpeggio-related Zod schemas.
*
* Covers:
* - ArpeggioMergeRawSchema cross-validation (.refine())
* - ArpeggioConfigRawSchema required fields and defaults
* - PieceMovementRawSchema with arpeggio field
*/
import { describe, it, expect } from 'vitest';
import {
ArpeggioMergeRawSchema,
ArpeggioConfigRawSchema,
PieceMovementRawSchema,
} from '../core/models/index.js';
describe('ArpeggioMergeRawSchema', () => {
it('should accept concat strategy without inline_js or file', () => {
const result = ArpeggioMergeRawSchema.safeParse({
strategy: 'concat',
});
expect(result.success).toBe(true);
});
it('should accept concat strategy with separator', () => {
const result = ArpeggioMergeRawSchema.safeParse({
strategy: 'concat',
separator: '\n---\n',
});
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.separator).toBe('\n---\n');
}
});
it('should default strategy to concat when omitted', () => {
const result = ArpeggioMergeRawSchema.safeParse({});
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.strategy).toBe('concat');
}
});
it('should accept custom strategy with inline_js', () => {
const result = ArpeggioMergeRawSchema.safeParse({
strategy: 'custom',
inline_js: 'return results.map(r => r.content).join(",");',
});
expect(result.success).toBe(true);
});
it('should accept custom strategy with file', () => {
const result = ArpeggioMergeRawSchema.safeParse({
strategy: 'custom',
file: './merge.js',
});
expect(result.success).toBe(true);
});
it('should reject custom strategy without inline_js or file', () => {
const result = ArpeggioMergeRawSchema.safeParse({
strategy: 'custom',
});
expect(result.success).toBe(false);
});
it('should reject concat strategy with inline_js', () => {
const result = ArpeggioMergeRawSchema.safeParse({
strategy: 'concat',
inline_js: 'return "hello";',
});
expect(result.success).toBe(false);
});
it('should reject concat strategy with file', () => {
const result = ArpeggioMergeRawSchema.safeParse({
strategy: 'concat',
file: './merge.js',
});
expect(result.success).toBe(false);
});
it('should reject invalid strategy value', () => {
const result = ArpeggioMergeRawSchema.safeParse({
strategy: 'invalid',
});
expect(result.success).toBe(false);
});
});
describe('ArpeggioConfigRawSchema', () => {
const validConfig = {
source: 'csv',
source_path: './data.csv',
template: './template.md',
};
it('should accept a valid minimal config', () => {
const result = ArpeggioConfigRawSchema.safeParse(validConfig);
expect(result.success).toBe(true);
});
it('should apply default values for optional fields', () => {
const result = ArpeggioConfigRawSchema.safeParse(validConfig);
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.batch_size).toBe(1);
expect(result.data.concurrency).toBe(1);
expect(result.data.max_retries).toBe(2);
expect(result.data.retry_delay_ms).toBe(1000);
}
});
it('should accept explicit values overriding defaults', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
batch_size: 5,
concurrency: 3,
max_retries: 4,
retry_delay_ms: 2000,
});
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.batch_size).toBe(5);
expect(result.data.concurrency).toBe(3);
expect(result.data.max_retries).toBe(4);
expect(result.data.retry_delay_ms).toBe(2000);
}
});
it('should accept config with merge field', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
merge: { strategy: 'concat', separator: '---' },
});
expect(result.success).toBe(true);
});
it('should accept config with output_path', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
output_path: './output.txt',
});
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.output_path).toBe('./output.txt');
}
});
it('should reject when source is empty', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
source: '',
});
expect(result.success).toBe(false);
});
it('should reject when source is missing', () => {
const { source: _, ...noSource } = validConfig;
const result = ArpeggioConfigRawSchema.safeParse(noSource);
expect(result.success).toBe(false);
});
it('should reject when source_path is empty', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
source_path: '',
});
expect(result.success).toBe(false);
});
it('should reject when source_path is missing', () => {
const { source_path: _, ...noSourcePath } = validConfig;
const result = ArpeggioConfigRawSchema.safeParse(noSourcePath);
expect(result.success).toBe(false);
});
it('should reject when template is empty', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
template: '',
});
expect(result.success).toBe(false);
});
it('should reject when template is missing', () => {
const { template: _, ...noTemplate } = validConfig;
const result = ArpeggioConfigRawSchema.safeParse(noTemplate);
expect(result.success).toBe(false);
});
it('should reject batch_size of 0', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
batch_size: 0,
});
expect(result.success).toBe(false);
});
it('should reject negative batch_size', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
batch_size: -1,
});
expect(result.success).toBe(false);
});
it('should reject concurrency of 0', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
concurrency: 0,
});
expect(result.success).toBe(false);
});
it('should reject negative concurrency', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
concurrency: -1,
});
expect(result.success).toBe(false);
});
it('should reject negative max_retries', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
max_retries: -1,
});
expect(result.success).toBe(false);
});
it('should accept max_retries of 0', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
max_retries: 0,
});
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.max_retries).toBe(0);
}
});
it('should reject non-integer batch_size', () => {
const result = ArpeggioConfigRawSchema.safeParse({
...validConfig,
batch_size: 1.5,
});
expect(result.success).toBe(false);
});
});
describe('PieceMovementRawSchema with arpeggio', () => {
it('should accept a movement with arpeggio config', () => {
const raw = {
name: 'batch-process',
arpeggio: {
source: 'csv',
source_path: './data.csv',
template: './prompt.md',
},
};
const result = PieceMovementRawSchema.safeParse(raw);
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.arpeggio).toBeDefined();
expect(result.data.arpeggio!.source).toBe('csv');
}
});
it('should accept a movement with arpeggio and rules', () => {
const raw = {
name: 'batch-process',
arpeggio: {
source: 'csv',
source_path: './data.csv',
template: './prompt.md',
batch_size: 2,
concurrency: 3,
},
rules: [
{ condition: 'All processed', next: 'COMPLETE' },
{ condition: 'Errors found', next: 'fix' },
],
};
const result = PieceMovementRawSchema.safeParse(raw);
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.arpeggio!.batch_size).toBe(2);
expect(result.data.arpeggio!.concurrency).toBe(3);
expect(result.data.rules).toHaveLength(2);
}
});
it('should accept a movement without arpeggio (normal movement)', () => {
const raw = {
name: 'normal-step',
persona: 'coder.md',
instruction_template: 'Do work',
};
const result = PieceMovementRawSchema.safeParse(raw);
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.arpeggio).toBeUndefined();
}
});
it('should accept a movement with arpeggio including custom merge', () => {
const raw = {
name: 'custom-merge-step',
arpeggio: {
source: 'csv',
source_path: './data.csv',
template: './prompt.md',
merge: {
strategy: 'custom',
inline_js: 'return results.map(r => r.content).join(", ");',
},
output_path: './output.txt',
},
};
const result = PieceMovementRawSchema.safeParse(raw);
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.arpeggio!.merge).toBeDefined();
expect(result.data.arpeggio!.output_path).toBe('./output.txt');
}
});
});

View File

@ -0,0 +1,83 @@
/**
* Tests for arpeggio template expansion.
*/
import { describe, it, expect } from 'vitest';
import { expandTemplate } from '../core/piece/arpeggio/template.js';
import type { DataBatch } from '../core/piece/arpeggio/types.js';
function makeBatch(rows: Record<string, string>[], batchIndex = 0, totalBatches = 1): DataBatch {
return { rows, batchIndex, totalBatches };
}
describe('expandTemplate', () => {
it('should expand {line:1} with formatted row data', () => {
const batch = makeBatch([{ name: 'Alice', age: '30' }]);
const result = expandTemplate('Process this: {line:1}', batch);
expect(result).toBe('Process this: name: Alice\nage: 30');
});
it('should expand {line:1} and {line:2} for multi-row batches', () => {
const batch = makeBatch([
{ name: 'Alice', age: '30' },
{ name: 'Bob', age: '25' },
]);
const result = expandTemplate('Row 1: {line:1}\nRow 2: {line:2}', batch);
expect(result).toBe('Row 1: name: Alice\nage: 30\nRow 2: name: Bob\nage: 25');
});
it('should expand {col:N:name} with specific column values', () => {
const batch = makeBatch([{ name: 'Alice', age: '30', city: 'Tokyo' }]);
const result = expandTemplate('Name: {col:1:name}, City: {col:1:city}', batch);
expect(result).toBe('Name: Alice, City: Tokyo');
});
it('should expand {batch_index} and {total_batches}', () => {
const batch = makeBatch([{ name: 'Alice' }], 2, 5);
const result = expandTemplate('Batch {batch_index} of {total_batches}', batch);
expect(result).toBe('Batch 2 of 5');
});
it('should expand all placeholder types in a single template', () => {
const batch = makeBatch([
{ name: 'Alice', role: 'dev' },
{ name: 'Bob', role: 'pm' },
], 0, 3);
const template = 'Batch {batch_index}/{total_batches}\nFirst: {col:1:name}\nSecond: {line:2}';
const result = expandTemplate(template, batch);
expect(result).toBe('Batch 0/3\nFirst: Alice\nSecond: name: Bob\nrole: pm');
});
it('should throw when {line:N} references out-of-range row', () => {
const batch = makeBatch([{ name: 'Alice' }]);
expect(() => expandTemplate('{line:2}', batch)).toThrow(
'Template placeholder {line:2} references row 2 but batch has 1 rows'
);
});
it('should throw when {col:N:name} references out-of-range row', () => {
const batch = makeBatch([{ name: 'Alice' }]);
expect(() => expandTemplate('{col:2:name}', batch)).toThrow(
'Template placeholder {col:2:name} references row 2 but batch has 1 rows'
);
});
it('should throw when {col:N:name} references unknown column', () => {
const batch = makeBatch([{ name: 'Alice' }]);
expect(() => expandTemplate('{col:1:missing}', batch)).toThrow(
'Template placeholder {col:1:missing} references unknown column "missing"'
);
});
it('should handle templates with no placeholders', () => {
const batch = makeBatch([{ name: 'Alice' }]);
const result = expandTemplate('No placeholders here', batch);
expect(result).toBe('No placeholders here');
});
it('should handle multiple occurrences of the same placeholder', () => {
const batch = makeBatch([{ name: 'Alice' }], 1, 3);
const result = expandTemplate('{batch_index} and {batch_index}', batch);
expect(result).toBe('1 and 1');
});
});

View File

@ -0,0 +1,275 @@
/**
* Integration tests for arpeggio movement execution via PieceEngine.
*
* Tests the full pipeline: CSV template expansion LLM merge rule evaluation.
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { writeFileSync, mkdirSync } from 'node:fs';
import { join } from 'node:path';
// Mock external dependencies before importing
vi.mock('../agents/runner.js', () => ({
runAgent: vi.fn(),
}));
vi.mock('../core/piece/evaluation/index.js', () => ({
detectMatchedRule: vi.fn(),
evaluateAggregateConditions: vi.fn(),
}));
vi.mock('../core/piece/phase-runner.js', () => ({
needsStatusJudgmentPhase: vi.fn().mockReturnValue(false),
runReportPhase: vi.fn().mockResolvedValue(undefined),
runStatusJudgmentPhase: vi.fn().mockResolvedValue(''),
}));
vi.mock('../shared/utils/index.js', async () => {
const actual = await vi.importActual<typeof import('../shared/utils/index.js')>('../shared/utils/index.js');
return {
...actual,
generateReportDir: vi.fn().mockReturnValue('test-report-dir'),
};
});
import { runAgent } from '../agents/runner.js';
import { detectMatchedRule } from '../core/piece/evaluation/index.js';
import { PieceEngine } from '../core/piece/engine/PieceEngine.js';
import type { PieceConfig, PieceMovement, AgentResponse, ArpeggioMovementConfig } from '../core/models/index.js';
import type { PieceEngineOptions } from '../core/piece/types.js';
import {
makeResponse,
makeMovement,
makeRule,
createTestTmpDir,
cleanupPieceEngine,
} from './engine-test-helpers.js';
import type { RuleMatch } from '../core/piece/index.js';
function createArpeggioTestDir(): { tmpDir: string; csvPath: string; templatePath: string } {
const tmpDir = createTestTmpDir();
const csvPath = join(tmpDir, 'data.csv');
const templatePath = join(tmpDir, 'template.md');
writeFileSync(csvPath, 'name,task\nAlice,review\nBob,implement\nCharlie,test', 'utf-8');
writeFileSync(templatePath, 'Process {line:1}', 'utf-8');
return { tmpDir, csvPath, templatePath };
}
function createArpeggioConfig(csvPath: string, templatePath: string, overrides: Partial<ArpeggioMovementConfig> = {}): ArpeggioMovementConfig {
return {
source: 'csv',
sourcePath: csvPath,
batchSize: 1,
concurrency: 1,
templatePath,
merge: { strategy: 'concat' },
maxRetries: 0,
retryDelayMs: 0,
...overrides,
};
}
function buildArpeggioPieceConfig(arpeggioConfig: ArpeggioMovementConfig, tmpDir: string): PieceConfig {
return {
name: 'test-arpeggio',
description: 'Test arpeggio piece',
maxIterations: 10,
initialMovement: 'process',
movements: [
{
...makeMovement('process', {
rules: [
makeRule('Processing complete', 'COMPLETE'),
makeRule('Processing failed', 'ABORT'),
],
}),
arpeggio: arpeggioConfig,
},
],
};
}
function createEngineOptions(tmpDir: string): PieceEngineOptions {
return {
projectCwd: tmpDir,
detectRuleIndex: () => 0,
callAiJudge: async () => 0,
};
}
describe('ArpeggioRunner integration', () => {
let engine: PieceEngine | undefined;
beforeEach(() => {
vi.resetAllMocks();
vi.mocked(detectMatchedRule).mockResolvedValue(undefined);
});
afterEach(() => {
if (engine) {
cleanupPieceEngine(engine);
engine = undefined;
}
});
it('should process CSV data and merge results', async () => {
const { tmpDir, csvPath, templatePath } = createArpeggioTestDir();
const arpeggioConfig = createArpeggioConfig(csvPath, templatePath);
const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir);
// Mock agent to return batch-specific responses
const mockAgent = vi.mocked(runAgent);
mockAgent
.mockResolvedValueOnce(makeResponse({ content: 'Processed Alice' }))
.mockResolvedValueOnce(makeResponse({ content: 'Processed Bob' }))
.mockResolvedValueOnce(makeResponse({ content: 'Processed Charlie' }));
// Mock rule detection for the merged result
vi.mocked(detectMatchedRule).mockResolvedValueOnce({
index: 0,
method: 'phase1_tag',
});
engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir));
const state = await engine.run();
expect(state.status).toBe('completed');
expect(mockAgent).toHaveBeenCalledTimes(3);
// Verify merged content in movement output
const output = state.movementOutputs.get('process');
expect(output).toBeDefined();
expect(output!.content).toBe('Processed Alice\nProcessed Bob\nProcessed Charlie');
});
it('should handle batch_size > 1', async () => {
const tmpDir = createTestTmpDir();
const csvPath = join(tmpDir, 'data.csv');
const templatePath = join(tmpDir, 'batch-template.md');
// 4 rows so batch_size=2 gives exactly 2 batches with 2 rows each
writeFileSync(csvPath, 'name,task\nAlice,review\nBob,implement\nCharlie,test\nDave,deploy', 'utf-8');
writeFileSync(templatePath, 'Row1: {line:1}\nRow2: {line:2}', 'utf-8');
const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { batchSize: 2 });
const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir);
const mockAgent = vi.mocked(runAgent);
mockAgent
.mockResolvedValueOnce(makeResponse({ content: 'Batch 0 result' }))
.mockResolvedValueOnce(makeResponse({ content: 'Batch 1 result' }));
vi.mocked(detectMatchedRule).mockResolvedValueOnce({
index: 0,
method: 'phase1_tag',
});
engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir));
const state = await engine.run();
expect(state.status).toBe('completed');
// 4 rows / batch_size 2 = 2 batches
expect(mockAgent).toHaveBeenCalledTimes(2);
});
it('should abort when a batch fails and retries are exhausted', async () => {
const { tmpDir, csvPath, templatePath } = createArpeggioTestDir();
const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, {
maxRetries: 1,
retryDelayMs: 0,
});
const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir);
const mockAgent = vi.mocked(runAgent);
// First batch succeeds
mockAgent.mockResolvedValueOnce(makeResponse({ content: 'OK' }));
// Second batch fails twice (initial + 1 retry)
mockAgent.mockResolvedValueOnce(makeResponse({ status: 'error', error: 'fail1' }));
mockAgent.mockResolvedValueOnce(makeResponse({ status: 'error', error: 'fail2' }));
// Third batch succeeds
mockAgent.mockResolvedValueOnce(makeResponse({ content: 'OK' }));
engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir));
const state = await engine.run();
expect(state.status).toBe('aborted');
});
it('should write output file when output_path is configured', async () => {
const { tmpDir, csvPath, templatePath } = createArpeggioTestDir();
const outputPath = join(tmpDir, 'output.txt');
const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { outputPath });
const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir);
const mockAgent = vi.mocked(runAgent);
mockAgent
.mockResolvedValueOnce(makeResponse({ content: 'Result A' }))
.mockResolvedValueOnce(makeResponse({ content: 'Result B' }))
.mockResolvedValueOnce(makeResponse({ content: 'Result C' }));
vi.mocked(detectMatchedRule).mockResolvedValueOnce({
index: 0,
method: 'phase1_tag',
});
engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir));
await engine.run();
const { readFileSync } = await import('node:fs');
const outputContent = readFileSync(outputPath, 'utf-8');
expect(outputContent).toBe('Result A\nResult B\nResult C');
});
it('should handle concurrency > 1', async () => {
const { tmpDir, csvPath, templatePath } = createArpeggioTestDir();
const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { concurrency: 3 });
const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir);
const mockAgent = vi.mocked(runAgent);
mockAgent
.mockResolvedValueOnce(makeResponse({ content: 'A' }))
.mockResolvedValueOnce(makeResponse({ content: 'B' }))
.mockResolvedValueOnce(makeResponse({ content: 'C' }));
vi.mocked(detectMatchedRule).mockResolvedValueOnce({
index: 0,
method: 'phase1_tag',
});
engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir));
const state = await engine.run();
expect(state.status).toBe('completed');
expect(mockAgent).toHaveBeenCalledTimes(3);
});
it('should use custom merge function when configured', async () => {
const { tmpDir, csvPath, templatePath } = createArpeggioTestDir();
const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, {
merge: {
strategy: 'custom',
inlineJs: 'return results.filter(r => r.success).map(r => r.content).join(" | ");',
},
});
const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir);
const mockAgent = vi.mocked(runAgent);
mockAgent
.mockResolvedValueOnce(makeResponse({ content: 'X' }))
.mockResolvedValueOnce(makeResponse({ content: 'Y' }))
.mockResolvedValueOnce(makeResponse({ content: 'Z' }));
vi.mocked(detectMatchedRule).mockResolvedValueOnce({
index: 0,
method: 'phase1_tag',
});
engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir));
const state = await engine.run();
expect(state.status).toBe('completed');
const output = state.movementOutputs.get('process');
expect(output!.content).toBe('X | Y | Z');
});
});

View File

@ -12,6 +12,8 @@ export type {
SessionState, SessionState,
PieceRule, PieceRule,
PieceMovement, PieceMovement,
ArpeggioMovementConfig,
ArpeggioMergeMovementConfig,
LoopDetectionConfig, LoopDetectionConfig,
LoopMonitorConfig, LoopMonitorConfig,
LoopMonitorJudge, LoopMonitorJudge,

View File

@ -114,12 +114,48 @@ export interface PieceMovement {
passPreviousResponse: boolean; passPreviousResponse: boolean;
/** Sub-movements to execute in parallel. When set, this movement runs all sub-movements concurrently. */ /** Sub-movements to execute in parallel. When set, this movement runs all sub-movements concurrently. */
parallel?: PieceMovement[]; parallel?: PieceMovement[];
/** Arpeggio configuration for data-driven batch processing. When set, this movement reads from a data source, expands templates, and calls LLM per batch. */
arpeggio?: ArpeggioMovementConfig;
/** Resolved policy content strings (from piece-level policies map, resolved at parse time) */ /** Resolved policy content strings (from piece-level policies map, resolved at parse time) */
policyContents?: string[]; policyContents?: string[];
/** Resolved knowledge content strings (from piece-level knowledge map, resolved at parse time) */ /** Resolved knowledge content strings (from piece-level knowledge map, resolved at parse time) */
knowledgeContents?: string[]; knowledgeContents?: string[];
} }
/** Merge configuration for arpeggio results */
export interface ArpeggioMergeMovementConfig {
/** Merge strategy: 'concat' (default), 'custom' */
readonly strategy: 'concat' | 'custom';
/** Inline JS merge function body (for custom strategy) */
readonly inlineJs?: string;
/** Path to external JS merge file (for custom strategy, resolved to absolute) */
readonly filePath?: string;
/** Separator for concat strategy (default: '\n') */
readonly separator?: string;
}
/** Arpeggio configuration for data-driven batch processing movements */
export interface ArpeggioMovementConfig {
/** Data source type (e.g., 'csv') */
readonly source: string;
/** Path to the data source file (resolved to absolute) */
readonly sourcePath: string;
/** Number of rows per batch (default: 1) */
readonly batchSize: number;
/** Number of concurrent LLM calls (default: 1) */
readonly concurrency: number;
/** Path to prompt template file (resolved to absolute) */
readonly templatePath: string;
/** Merge configuration */
readonly merge: ArpeggioMergeMovementConfig;
/** Maximum retry attempts per batch (default: 2) */
readonly maxRetries: number;
/** Delay between retries in ms (default: 1000) */
readonly retryDelayMs: number;
/** Optional output file path (resolved to absolute) */
readonly outputPath?: string;
}
/** Loop detection configuration */ /** Loop detection configuration */
export interface LoopDetectionConfig { export interface LoopDetectionConfig {
/** Maximum consecutive runs of the same step before triggering (default: 10) */ /** Maximum consecutive runs of the same step before triggering (default: 10) */

View File

@ -130,6 +130,46 @@ export const PieceRuleSchema = z.object({
interactive_only: z.boolean().optional(), interactive_only: z.boolean().optional(),
}); });
/** Arpeggio merge configuration schema */
export const ArpeggioMergeRawSchema = z.object({
/** Merge strategy: 'concat' or 'custom' */
strategy: z.enum(['concat', 'custom']).optional().default('concat'),
/** Inline JS function body for custom merge */
inline_js: z.string().optional(),
/** External JS file path for custom merge */
file: z.string().optional(),
/** Separator for concat strategy */
separator: z.string().optional(),
}).refine(
(data) => data.strategy !== 'custom' || data.inline_js != null || data.file != null,
{ message: "Custom merge strategy requires either 'inline_js' or 'file'" }
).refine(
(data) => data.strategy !== 'concat' || (data.inline_js == null && data.file == null),
{ message: "Concat merge strategy does not accept 'inline_js' or 'file'" }
);
/** Arpeggio configuration schema for data-driven batch processing */
export const ArpeggioConfigRawSchema = z.object({
/** Data source type (e.g., 'csv') */
source: z.string().min(1),
/** Path to the data source file */
source_path: z.string().min(1),
/** Number of rows per batch (default: 1) */
batch_size: z.number().int().positive().optional().default(1),
/** Number of concurrent LLM calls (default: 1) */
concurrency: z.number().int().positive().optional().default(1),
/** Path to prompt template file */
template: z.string().min(1),
/** Merge configuration */
merge: ArpeggioMergeRawSchema.optional(),
/** Maximum retry attempts per batch (default: 2) */
max_retries: z.number().int().min(0).optional().default(2),
/** Delay between retries in ms (default: 1000) */
retry_delay_ms: z.number().int().min(0).optional().default(1000),
/** Optional output file path */
output_path: z.string().optional(),
});
/** Sub-movement schema for parallel execution */ /** Sub-movement schema for parallel execution */
export const ParallelSubMovementRawSchema = z.object({ export const ParallelSubMovementRawSchema = z.object({
name: z.string().min(1), name: z.string().min(1),
@ -190,6 +230,8 @@ export const PieceMovementRawSchema = z.object({
pass_previous_response: z.boolean().optional().default(true), pass_previous_response: z.boolean().optional().default(true),
/** Sub-movements to execute in parallel */ /** Sub-movements to execute in parallel */
parallel: z.array(ParallelSubMovementRawSchema).optional(), parallel: z.array(ParallelSubMovementRawSchema).optional(),
/** Arpeggio configuration for data-driven batch processing */
arpeggio: ArpeggioConfigRawSchema.optional(),
}); });
/** Loop monitor rule schema */ /** Loop monitor rule schema */

View File

@ -31,6 +31,8 @@ export type {
OutputContractEntry, OutputContractEntry,
McpServerConfig, McpServerConfig,
PieceMovement, PieceMovement,
ArpeggioMovementConfig,
ArpeggioMergeMovementConfig,
LoopDetectionConfig, LoopDetectionConfig,
LoopMonitorConfig, LoopMonitorConfig,
LoopMonitorJudge, LoopMonitorJudge,

View File

@ -0,0 +1,133 @@
/**
* CSV data source for arpeggio movements.
*
* Reads CSV files and returns data in batches for template expansion.
* Handles quoted fields, escaped quotes, and various line endings.
*/
import { readFileSync } from 'node:fs';
import type { ArpeggioDataSource, DataBatch, DataRow } from './types.js';
/** Parse a CSV string into an array of string arrays (rows of fields) */
export function parseCsv(content: string): string[][] {
const rows: string[][] = [];
let currentRow: string[] = [];
let currentField = '';
let inQuotes = false;
let i = 0;
while (i < content.length) {
const char = content[i]!;
if (inQuotes) {
if (char === '"') {
// Check for escaped quote ("")
if (i + 1 < content.length && content[i + 1] === '"') {
currentField += '"';
i += 2;
continue;
}
// End of quoted field
inQuotes = false;
i++;
continue;
}
currentField += char;
i++;
continue;
}
if (char === '"' && currentField.length === 0) {
inQuotes = true;
i++;
continue;
}
if (char === ',') {
currentRow.push(currentField);
currentField = '';
i++;
continue;
}
if (char === '\r') {
// Handle \r\n and bare \r
currentRow.push(currentField);
currentField = '';
rows.push(currentRow);
currentRow = [];
if (i + 1 < content.length && content[i + 1] === '\n') {
i += 2;
} else {
i++;
}
continue;
}
if (char === '\n') {
currentRow.push(currentField);
currentField = '';
rows.push(currentRow);
currentRow = [];
i++;
continue;
}
currentField += char;
i++;
}
// Handle last field/row
if (currentField.length > 0 || currentRow.length > 0) {
currentRow.push(currentField);
rows.push(currentRow);
}
return rows;
}
/** Convert parsed CSV rows into DataRow objects using the header row */
function rowsToDataRows(headers: readonly string[], dataRows: readonly string[][]): DataRow[] {
return dataRows.map((row) => {
const dataRow: DataRow = {};
for (let col = 0; col < headers.length; col++) {
const header = headers[col]!;
dataRow[header] = row[col] ?? '';
}
return dataRow;
});
}
/** Split an array into chunks of the given size */
function chunk<T>(array: readonly T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
export class CsvDataSource implements ArpeggioDataSource {
constructor(private readonly filePath: string) {}
async readBatches(batchSize: number): Promise<readonly DataBatch[]> {
const content = readFileSync(this.filePath, 'utf-8');
const parsed = parseCsv(content);
if (parsed.length < 2) {
throw new Error(`CSV file has no data rows: ${this.filePath}`);
}
const headers = parsed[0]!;
const dataRowArrays = parsed.slice(1);
const dataRows = rowsToDataRows(headers, dataRowArrays);
const chunks = chunk(dataRows, batchSize);
const totalBatches = chunks.length;
return chunks.map((rows, index) => ({
rows,
batchIndex: index,
totalBatches,
}));
}
}

View File

@ -0,0 +1,41 @@
/**
* Factory for creating data source instances.
*
* Maps source type names to their implementations.
* Built-in: 'csv'. Users can extend with custom JS modules.
*/
import type { ArpeggioDataSource } from './types.js';
import { CsvDataSource } from './csv-data-source.js';
/** Built-in data source type mapping */
const BUILTIN_SOURCES: Record<string, (path: string) => ArpeggioDataSource> = {
csv: (path) => new CsvDataSource(path),
};
/**
* Create a data source instance by type and path.
*
* For built-in types ('csv'), uses the registered factory.
* For custom types, loads from the source type as a JS module path.
*/
export async function createDataSource(
sourceType: string,
sourcePath: string,
): Promise<ArpeggioDataSource> {
const builtinFactory = BUILTIN_SOURCES[sourceType];
if (builtinFactory) {
return builtinFactory(sourcePath);
}
// Custom data source: sourceType is a path to a JS module that exports a factory
const module = await import(sourceType) as {
default?: (path: string) => ArpeggioDataSource;
};
if (typeof module.default !== 'function') {
throw new Error(
`Custom data source module "${sourceType}" must export a default factory function`
);
}
return module.default(sourcePath);
}

View File

@ -0,0 +1,78 @@
/**
* Merge processing for arpeggio batch results.
*
* Supports two merge strategies:
* - 'concat': Simple concatenation with configurable separator
* - 'custom': User-provided merge function (inline JS or external file)
*/
import { writeFileSync } from 'node:fs';
import type { ArpeggioMergeMovementConfig, BatchResult, MergeFn } from './types.js';
/** Create a concat merge function with the given separator */
function createConcatMerge(separator: string): MergeFn {
return (results) =>
results
.filter((r) => r.success)
.sort((a, b) => a.batchIndex - b.batchIndex)
.map((r) => r.content)
.join(separator);
}
/**
* Create a merge function from inline JavaScript.
*
* The inline JS receives `results` as the function parameter (readonly BatchResult[]).
* It must return a string.
*/
function createInlineJsMerge(jsBody: string): MergeFn {
const fn = new Function('results', jsBody) as MergeFn;
return (results) => {
const output = fn(results);
if (typeof output !== 'string') {
throw new Error(`Inline JS merge function must return a string, got ${typeof output}`);
}
return output;
};
}
/**
* Create a merge function from an external JS file.
*
* The file must export a default function: (results: BatchResult[]) => string
*/
async function createFileMerge(filePath: string): Promise<MergeFn> {
const module = await import(filePath) as { default?: MergeFn };
if (typeof module.default !== 'function') {
throw new Error(`Merge file "${filePath}" must export a default function`);
}
return module.default;
}
/**
* Build a merge function from the arpeggio merge configuration.
*
* For 'concat' strategy: returns a simple join function.
* For 'custom' strategy: loads from inline JS or external file.
*/
export async function buildMergeFn(config: ArpeggioMergeMovementConfig): Promise<MergeFn> {
if (config.strategy === 'concat') {
return createConcatMerge(config.separator ?? '\n');
}
// Custom strategy
if (config.inlineJs) {
return createInlineJsMerge(config.inlineJs);
}
if (config.filePath) {
return createFileMerge(config.filePath);
}
throw new Error('Custom merge strategy requires either inline_js or file path');
}
/** Write merged output to a file if output_path is configured */
export function writeMergedOutput(outputPath: string, content: string): void {
writeFileSync(outputPath, content, 'utf-8');
}

View File

@ -0,0 +1,72 @@
/**
* Template expansion for arpeggio movements.
*
* Expands placeholders in prompt templates using data from batches:
* - {line:N} entire row N as "key: value" pairs (1-based)
* - {col:N:name} specific column value from row N (1-based)
* - {batch_index} 0-based batch index
* - {total_batches} total number of batches
*/
import { readFileSync } from 'node:fs';
import type { DataBatch, DataRow } from './types.js';
/** Format a single data row as "key: value" lines */
function formatRow(row: DataRow): string {
return Object.entries(row)
.map(([key, value]) => `${key}: ${value}`)
.join('\n');
}
/**
* Expand placeholders in a template string using batch data.
*
* Supported placeholders:
* - {line:N} Row N (1-based) formatted as "key: value" lines
* - {col:N:name} Column "name" from row N (1-based)
* - {batch_index} 0-based batch index
* - {total_batches} Total number of batches
*/
export function expandTemplate(template: string, batch: DataBatch): string {
let result = template;
// Replace {batch_index} and {total_batches}
result = result.replace(/\{batch_index\}/g, String(batch.batchIndex));
result = result.replace(/\{total_batches\}/g, String(batch.totalBatches));
// Replace {col:N:name} — must be done before {line:N} to avoid partial matches
result = result.replace(/\{col:(\d+):(\w+)\}/g, (_match, indexStr: string, colName: string) => {
const rowIndex = parseInt(indexStr, 10) - 1;
if (rowIndex < 0 || rowIndex >= batch.rows.length) {
throw new Error(
`Template placeholder {col:${indexStr}:${colName}} references row ${indexStr} but batch has ${batch.rows.length} rows`
);
}
const row = batch.rows[rowIndex]!;
const value = row[colName];
if (value === undefined) {
throw new Error(
`Template placeholder {col:${indexStr}:${colName}} references unknown column "${colName}"`
);
}
return value;
});
// Replace {line:N}
result = result.replace(/\{line:(\d+)\}/g, (_match, indexStr: string) => {
const rowIndex = parseInt(indexStr, 10) - 1;
if (rowIndex < 0 || rowIndex >= batch.rows.length) {
throw new Error(
`Template placeholder {line:${indexStr}} references row ${indexStr} but batch has ${batch.rows.length} rows`
);
}
return formatRow(batch.rows[rowIndex]!);
});
return result;
}
/** Load a template file and return its content */
export function loadTemplate(templatePath: string): string {
return readFileSync(templatePath, 'utf-8');
}

View File

@ -0,0 +1,46 @@
/**
* Arpeggio movement internal type definitions.
*
* Configuration types (ArpeggioMovementConfig, ArpeggioMergeMovementConfig)
* live in models/piece-types.ts as part of PieceMovement.
* This file defines runtime types used internally by the arpeggio module.
*/
export type {
ArpeggioMovementConfig,
ArpeggioMergeMovementConfig,
} from '../../models/piece-types.js';
/** A single row of data from a data source (column name → value) */
export type DataRow = Record<string, string>;
/** A batch of rows read from a data source */
export interface DataBatch {
/** The rows in this batch */
readonly rows: readonly DataRow[];
/** 0-based index of this batch in the overall data set */
readonly batchIndex: number;
/** Total number of batches (known after full read) */
readonly totalBatches: number;
}
/** Interface for data source implementations */
export interface ArpeggioDataSource {
/** Read all batches from the data source. Returns an array of DataBatch. */
readBatches(batchSize: number): Promise<readonly DataBatch[]>;
}
/** Result of a single LLM call for one batch */
export interface BatchResult {
/** 0-based index of the batch */
readonly batchIndex: number;
/** LLM response content */
readonly content: string;
/** Whether this result was successful */
readonly success: boolean;
/** Error message if failed */
readonly error?: string;
}
/** Merge function signature: takes all batch results, returns merged string */
export type MergeFn = (results: readonly BatchResult[]) => string;

View File

@ -0,0 +1,268 @@
/**
* Executes arpeggio piece movements: data-driven batch processing.
*
* Reads data from a source, expands templates with batch data,
* calls LLM for each batch (with concurrency control),
* merges results, and returns an aggregated response.
*/
import type {
PieceMovement,
PieceState,
AgentResponse,
} from '../../models/types.js';
import type { ArpeggioMovementConfig, BatchResult, DataBatch } from '../arpeggio/types.js';
import { createDataSource } from '../arpeggio/data-source-factory.js';
import { loadTemplate, expandTemplate } from '../arpeggio/template.js';
import { buildMergeFn, writeMergedOutput } from '../arpeggio/merge.js';
import { runAgent, type RunAgentOptions } from '../../../agents/runner.js';
import { detectMatchedRule } from '../evaluation/index.js';
import { incrementMovementIteration } from './state-manager.js';
import { createLogger } from '../../../shared/utils/index.js';
import type { OptionsBuilder } from './OptionsBuilder.js';
import type { PhaseName } from '../types.js';
const log = createLogger('arpeggio-runner');
export interface ArpeggioRunnerDeps {
readonly optionsBuilder: OptionsBuilder;
readonly getCwd: () => string;
readonly getInteractive: () => boolean;
readonly detectRuleIndex: (content: string, movementName: string) => number;
readonly callAiJudge: (
agentOutput: string,
conditions: Array<{ index: number; text: string }>,
options: { cwd: string }
) => Promise<number>;
readonly onPhaseStart?: (step: PieceMovement, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void;
readonly onPhaseComplete?: (step: PieceMovement, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void;
}
/**
* Simple semaphore for controlling concurrency.
* Limits the number of concurrent async operations.
*/
class Semaphore {
private running = 0;
private readonly waiting: Array<() => void> = [];
constructor(private readonly maxConcurrency: number) {}
async acquire(): Promise<void> {
if (this.running < this.maxConcurrency) {
this.running++;
return;
}
return new Promise<void>((resolve) => {
this.waiting.push(resolve);
});
}
release(): void {
if (this.waiting.length > 0) {
const next = this.waiting.shift()!;
next();
} else {
this.running--;
}
}
}
/** Execute a single batch with retry logic */
async function executeBatchWithRetry(
batch: DataBatch,
template: string,
persona: string | undefined,
agentOptions: RunAgentOptions,
maxRetries: number,
retryDelayMs: number,
): Promise<BatchResult> {
const prompt = expandTemplate(template, batch);
let lastError: string | undefined;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const response = await runAgent(persona, prompt, agentOptions);
if (response.status === 'error') {
lastError = response.error ?? response.content ?? 'Agent returned error status';
log.info('Batch execution failed, retrying', {
batchIndex: batch.batchIndex,
attempt: attempt + 1,
maxRetries,
error: lastError,
});
if (attempt < maxRetries) {
await delay(retryDelayMs);
continue;
}
return {
batchIndex: batch.batchIndex,
content: '',
success: false,
error: lastError,
};
}
return {
batchIndex: batch.batchIndex,
content: response.content,
success: true,
};
} catch (error) {
lastError = error instanceof Error ? error.message : String(error);
log.info('Batch execution threw, retrying', {
batchIndex: batch.batchIndex,
attempt: attempt + 1,
maxRetries,
error: lastError,
});
if (attempt < maxRetries) {
await delay(retryDelayMs);
continue;
}
}
}
return {
batchIndex: batch.batchIndex,
content: '',
success: false,
error: lastError,
};
}
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export class ArpeggioRunner {
constructor(
private readonly deps: ArpeggioRunnerDeps,
) {}
/**
* Run an arpeggio movement: read data, expand templates, call LLM,
* merge results, and return an aggregated response.
*/
async runArpeggioMovement(
step: PieceMovement,
state: PieceState,
): Promise<{ response: AgentResponse; instruction: string }> {
const arpeggioConfig = step.arpeggio;
if (!arpeggioConfig) {
throw new Error(`Movement "${step.name}" has no arpeggio configuration`);
}
const movementIteration = incrementMovementIteration(state, step.name);
log.debug('Running arpeggio movement', {
movement: step.name,
source: arpeggioConfig.source,
batchSize: arpeggioConfig.batchSize,
concurrency: arpeggioConfig.concurrency,
movementIteration,
});
const dataSource = await createDataSource(arpeggioConfig.source, arpeggioConfig.sourcePath);
const batches = await dataSource.readBatches(arpeggioConfig.batchSize);
if (batches.length === 0) {
throw new Error(`Data source returned no batches for movement "${step.name}"`);
}
log.info('Arpeggio data loaded', {
movement: step.name,
batchCount: batches.length,
batchSize: arpeggioConfig.batchSize,
});
const template = loadTemplate(arpeggioConfig.templatePath);
const agentOptions = this.deps.optionsBuilder.buildAgentOptions(step);
const semaphore = new Semaphore(arpeggioConfig.concurrency);
const results = await this.executeBatches(
batches,
template,
step,
agentOptions,
arpeggioConfig,
semaphore,
);
const failedBatches = results.filter((r) => !r.success);
if (failedBatches.length > 0) {
const errorDetails = failedBatches
.map((r) => `batch ${r.batchIndex}: ${r.error}`)
.join('; ');
throw new Error(
`Arpeggio movement "${step.name}" failed: ${failedBatches.length}/${results.length} batches failed (${errorDetails})`
);
}
const mergeFn = await buildMergeFn(arpeggioConfig.merge);
const mergedContent = mergeFn(results);
if (arpeggioConfig.outputPath) {
writeMergedOutput(arpeggioConfig.outputPath, mergedContent);
log.info('Arpeggio output written', { outputPath: arpeggioConfig.outputPath });
}
const ruleCtx = {
state,
cwd: this.deps.getCwd(),
interactive: this.deps.getInteractive(),
detectRuleIndex: this.deps.detectRuleIndex,
callAiJudge: this.deps.callAiJudge,
};
const match = await detectMatchedRule(step, mergedContent, '', ruleCtx);
const aggregatedResponse: AgentResponse = {
persona: step.name,
status: 'done',
content: mergedContent,
timestamp: new Date(),
...(match && { matchedRuleIndex: match.index, matchedRuleMethod: match.method }),
};
state.movementOutputs.set(step.name, aggregatedResponse);
state.lastOutput = aggregatedResponse;
const instruction = `[Arpeggio] ${step.name}: ${batches.length} batches, source=${arpeggioConfig.source}`;
return { response: aggregatedResponse, instruction };
}
/** Execute all batches with concurrency control */
private async executeBatches(
batches: readonly DataBatch[],
template: string,
step: PieceMovement,
agentOptions: RunAgentOptions,
config: ArpeggioMovementConfig,
semaphore: Semaphore,
): Promise<BatchResult[]> {
const promises = batches.map(async (batch) => {
await semaphore.acquire();
try {
this.deps.onPhaseStart?.(step, 1, 'execute', `[Arpeggio batch ${batch.batchIndex + 1}/${batch.totalBatches}]`);
const result = await executeBatchWithRetry(
batch,
template,
step.persona,
agentOptions,
config.maxRetries,
config.retryDelayMs,
);
this.deps.onPhaseComplete?.(
step, 1, 'execute',
result.content,
result.success ? 'done' : 'error',
result.error,
);
return result;
} finally {
semaphore.release();
}
});
return Promise.all(promises);
}
}

View File

@ -31,6 +31,7 @@ import { generateReportDir, getErrorMessage, createLogger } from '../../../share
import { OptionsBuilder } from './OptionsBuilder.js'; import { OptionsBuilder } from './OptionsBuilder.js';
import { MovementExecutor } from './MovementExecutor.js'; import { MovementExecutor } from './MovementExecutor.js';
import { ParallelRunner } from './ParallelRunner.js'; import { ParallelRunner } from './ParallelRunner.js';
import { ArpeggioRunner } from './ArpeggioRunner.js';
const log = createLogger('engine'); const log = createLogger('engine');
@ -60,6 +61,7 @@ export class PieceEngine extends EventEmitter {
private readonly optionsBuilder: OptionsBuilder; private readonly optionsBuilder: OptionsBuilder;
private readonly movementExecutor: MovementExecutor; private readonly movementExecutor: MovementExecutor;
private readonly parallelRunner: ParallelRunner; private readonly parallelRunner: ParallelRunner;
private readonly arpeggioRunner: ArpeggioRunner;
private readonly detectRuleIndex: (content: string, movementName: string) => number; private readonly detectRuleIndex: (content: string, movementName: string) => number;
private readonly callAiJudge: ( private readonly callAiJudge: (
agentOutput: string, agentOutput: string,
@ -139,6 +141,20 @@ export class PieceEngine extends EventEmitter {
}, },
}); });
this.arpeggioRunner = new ArpeggioRunner({
optionsBuilder: this.optionsBuilder,
getCwd: () => this.cwd,
getInteractive: () => this.options.interactive === true,
detectRuleIndex: this.detectRuleIndex,
callAiJudge: this.callAiJudge,
onPhaseStart: (step, phase, phaseName, instruction) => {
this.emit('phase:start', step, phase, phaseName, instruction);
},
onPhaseComplete: (step, phase, phaseName, content, phaseStatus, error) => {
this.emit('phase:complete', step, phase, phaseName, content, phaseStatus, error);
},
});
log.debug('PieceEngine initialized', { log.debug('PieceEngine initialized', {
piece: config.name, piece: config.name,
movements: config.movements.map(s => s.name), movements: config.movements.map(s => s.name),
@ -290,7 +306,7 @@ export class PieceEngine extends EventEmitter {
} }
} }
/** Run a single movement (delegates to ParallelRunner if movement has parallel sub-movements) */ /** Run a single movement (delegates to ParallelRunner, ArpeggioRunner, or MovementExecutor) */
private async runMovement(step: PieceMovement, prebuiltInstruction?: string): Promise<{ response: AgentResponse; instruction: string }> { private async runMovement(step: PieceMovement, prebuiltInstruction?: string): Promise<{ response: AgentResponse; instruction: string }> {
const updateSession = this.updatePersonaSession.bind(this); const updateSession = this.updatePersonaSession.bind(this);
let result: { response: AgentResponse; instruction: string }; let result: { response: AgentResponse; instruction: string };
@ -299,6 +315,10 @@ export class PieceEngine extends EventEmitter {
result = await this.parallelRunner.runParallelMovement( result = await this.parallelRunner.runParallelMovement(
step, this.state, this.task, this.config.maxIterations, updateSession, step, this.state, this.task, this.config.maxIterations, updateSession,
); );
} else if (step.arpeggio) {
result = await this.arpeggioRunner.runArpeggioMovement(
step, this.state,
);
} else { } else {
result = await this.movementExecutor.runNormalMovement( result = await this.movementExecutor.runNormalMovement(
step, this.state, this.task, this.config.maxIterations, updateSession, prebuiltInstruction, step, this.state, this.task, this.config.maxIterations, updateSession, prebuiltInstruction,
@ -492,10 +512,11 @@ export class PieceEngine extends EventEmitter {
this.state.iteration++; this.state.iteration++;
// Build instruction before emitting movement:start so listeners can log it // Build instruction before emitting movement:start so listeners can log it.
const isParallel = movement.parallel && movement.parallel.length > 0; // Parallel and arpeggio movements handle iteration incrementing internally.
const isDelegated = (movement.parallel && movement.parallel.length > 0) || !!movement.arpeggio;
let prebuiltInstruction: string | undefined; let prebuiltInstruction: string | undefined;
if (!isParallel) { if (!isDelegated) {
const movementIteration = incrementMovementIteration(this.state, movement.name); const movementIteration = incrementMovementIteration(this.state, movement.name);
prebuiltInstruction = this.movementExecutor.buildInstruction( prebuiltInstruction = this.movementExecutor.buildInstruction(
movement, movementIteration, this.state, this.task, this.config.maxIterations, movement, movementIteration, this.state, this.task, this.config.maxIterations,

View File

@ -8,6 +8,7 @@ export { PieceEngine } from './PieceEngine.js';
export { MovementExecutor } from './MovementExecutor.js'; export { MovementExecutor } from './MovementExecutor.js';
export type { MovementExecutorDeps } from './MovementExecutor.js'; export type { MovementExecutorDeps } from './MovementExecutor.js';
export { ParallelRunner } from './ParallelRunner.js'; export { ParallelRunner } from './ParallelRunner.js';
export { ArpeggioRunner } from './ArpeggioRunner.js';
export { OptionsBuilder } from './OptionsBuilder.js'; export { OptionsBuilder } from './OptionsBuilder.js';
export { CycleDetector } from './cycle-detector.js'; export { CycleDetector } from './cycle-detector.js';
export type { CycleCheckResult } from './cycle-detector.js'; export type { CycleCheckResult } from './cycle-detector.js';

View File

@ -6,11 +6,11 @@
*/ */
import { readFileSync, existsSync } from 'node:fs'; import { readFileSync, existsSync } from 'node:fs';
import { dirname } from 'node:path'; import { dirname, resolve } from 'node:path';
import { parse as parseYaml } from 'yaml'; import { parse as parseYaml } from 'yaml';
import type { z } from 'zod'; import type { z } from 'zod';
import { PieceConfigRawSchema, PieceMovementRawSchema } from '../../../core/models/index.js'; import { PieceConfigRawSchema, PieceMovementRawSchema } from '../../../core/models/index.js';
import type { PieceConfig, PieceMovement, PieceRule, OutputContractEntry, OutputContractLabelPath, OutputContractItem, LoopMonitorConfig, LoopMonitorJudge } from '../../../core/models/index.js'; import type { PieceConfig, PieceMovement, PieceRule, OutputContractEntry, OutputContractLabelPath, OutputContractItem, LoopMonitorConfig, LoopMonitorJudge, ArpeggioMovementConfig, ArpeggioMergeMovementConfig } from '../../../core/models/index.js';
import { getLanguage } from '../global/globalConfig.js'; import { getLanguage } from '../global/globalConfig.js';
import { import {
type PieceSections, type PieceSections,
@ -150,6 +150,35 @@ function normalizeRule(r: {
}; };
} }
/** Normalize raw arpeggio config from YAML into internal format. */
function normalizeArpeggio(
raw: RawStep['arpeggio'],
pieceDir: string,
): ArpeggioMovementConfig | undefined {
if (!raw) return undefined;
const merge: ArpeggioMergeMovementConfig = raw.merge
? {
strategy: raw.merge.strategy,
inlineJs: raw.merge.inline_js,
filePath: raw.merge.file ? resolve(pieceDir, raw.merge.file) : undefined,
separator: raw.merge.separator,
}
: { strategy: 'concat' };
return {
source: raw.source,
sourcePath: resolve(pieceDir, raw.source_path),
batchSize: raw.batch_size,
concurrency: raw.concurrency,
templatePath: resolve(pieceDir, raw.template),
merge,
maxRetries: raw.max_retries,
retryDelayMs: raw.retry_delay_ms,
outputPath: raw.output_path ? resolve(pieceDir, raw.output_path) : undefined,
};
}
/** Normalize a raw step into internal PieceMovement format. */ /** Normalize a raw step into internal PieceMovement format. */
function normalizeStepFromRaw( function normalizeStepFromRaw(
step: RawStep, step: RawStep,
@ -203,6 +232,11 @@ function normalizeStepFromRaw(
result.parallel = step.parallel.map((sub: RawStep) => normalizeStepFromRaw(sub, pieceDir, sections, context)); result.parallel = step.parallel.map((sub: RawStep) => normalizeStepFromRaw(sub, pieceDir, sections, context));
} }
const arpeggioConfig = normalizeArpeggio(step.arpeggio, pieceDir);
if (arpeggioConfig) {
result.arpeggio = arpeggioConfig;
}
return result; return result;
} }