From 7e15691ba2f8c8b0870617c3166f8a7c7f616256 Mon Sep 17 00:00:00 2001 From: nrs <38722970+nrslib@users.noreply.github.com> Date: Tue, 10 Feb 2026 13:37:15 +0900 Subject: [PATCH] github-issue-200-arpeggio (#203) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: stable release時にnext dist-tagを自動同期 * takt: github-issue-200-arpeggio --- .github/workflows/auto-tag.yml | 37 +- src/__tests__/arpeggio-csv.test.ts | 136 +++++++ .../arpeggio-data-source-factory.test.ts | 50 +++ src/__tests__/arpeggio-merge.test.ts | 108 ++++++ src/__tests__/arpeggio-schema.test.ts | 332 ++++++++++++++++++ src/__tests__/arpeggio-template.test.ts | 83 +++++ src/__tests__/engine-arpeggio.test.ts | 275 +++++++++++++++ src/core/models/index.ts | 2 + src/core/models/piece-types.ts | 36 ++ src/core/models/schemas.ts | 42 +++ src/core/models/types.ts | 2 + src/core/piece/arpeggio/csv-data-source.ts | 133 +++++++ .../piece/arpeggio/data-source-factory.ts | 41 +++ src/core/piece/arpeggio/merge.ts | 78 ++++ src/core/piece/arpeggio/template.ts | 72 ++++ src/core/piece/arpeggio/types.ts | 46 +++ src/core/piece/engine/ArpeggioRunner.ts | 268 ++++++++++++++ src/core/piece/engine/PieceEngine.ts | 29 +- src/core/piece/engine/index.ts | 1 + src/infra/config/loaders/pieceParser.ts | 38 +- 20 files changed, 1802 insertions(+), 7 deletions(-) create mode 100644 src/__tests__/arpeggio-csv.test.ts create mode 100644 src/__tests__/arpeggio-data-source-factory.test.ts create mode 100644 src/__tests__/arpeggio-merge.test.ts create mode 100644 src/__tests__/arpeggio-schema.test.ts create mode 100644 src/__tests__/arpeggio-template.test.ts create mode 100644 src/__tests__/engine-arpeggio.test.ts create mode 100644 src/core/piece/arpeggio/csv-data-source.ts create mode 100644 src/core/piece/arpeggio/data-source-factory.ts create mode 100644 src/core/piece/arpeggio/merge.ts create mode 100644 src/core/piece/arpeggio/template.ts create mode 100644 src/core/piece/arpeggio/types.ts create mode 100644 src/core/piece/engine/ArpeggioRunner.ts diff --git a/.github/workflows/auto-tag.yml b/.github/workflows/auto-tag.yml index bab7fad..502d777 100644 --- a/.github/workflows/auto-tag.yml +++ b/.github/workflows/auto-tag.yml @@ -52,12 +52,47 @@ jobs: run: | VERSION="${{ needs.tag.outputs.tag }}" VERSION="${VERSION#v}" + echo "version=$VERSION" >> "$GITHUB_OUTPUT" if echo "$VERSION" | grep -qE '(alpha|beta|rc|next)'; then echo "tag=next" >> "$GITHUB_OUTPUT" else echo "tag=latest" >> "$GITHUB_OUTPUT" fi - - run: npm publish --tag ${{ steps.npm-tag.outputs.tag }} + - name: Publish package + run: npm publish --tag ${{ steps.npm-tag.outputs.tag }} env: NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} + + - name: Sync next tag on stable release + if: steps.npm-tag.outputs.tag == 'latest' + run: | + PACKAGE_NAME=$(node -p "require('./package.json').name") + VERSION="${{ steps.npm-tag.outputs.version }}" + + for attempt in 1 2 3; do + if npm dist-tag add "${PACKAGE_NAME}@${VERSION}" next; then + exit 0 + fi + if [ "$attempt" -eq 3 ]; then + echo "Failed to sync next tag after 3 attempts." + exit 1 + fi + sleep $((attempt * 5)) + done + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} + + - name: Verify dist-tags + run: | + PACKAGE_NAME=$(node -p "require('./package.json').name") + LATEST=$(npm view "${PACKAGE_NAME}" dist-tags.latest) + NEXT=$(npm view "${PACKAGE_NAME}" dist-tags.next || true) + + echo "latest=${LATEST}" + echo "next=${NEXT}" + + if [ "${{ steps.npm-tag.outputs.tag }}" = "latest" ] && [ "${LATEST}" != "${NEXT}" ]; then + echo "Expected next to match latest on stable release, but they differ." + exit 1 + fi diff --git a/src/__tests__/arpeggio-csv.test.ts b/src/__tests__/arpeggio-csv.test.ts new file mode 100644 index 0000000..9d51793 --- /dev/null +++ b/src/__tests__/arpeggio-csv.test.ts @@ -0,0 +1,136 @@ +/** + * Tests for CSV data source parsing and batch reading. + */ + +import { describe, it, expect } from 'vitest'; +import { parseCsv, CsvDataSource } from '../core/piece/arpeggio/csv-data-source.js'; +import { writeFileSync, mkdirSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { randomUUID } from 'node:crypto'; + +describe('parseCsv', () => { + it('should parse simple CSV content', () => { + const csv = 'name,age\nAlice,30\nBob,25'; + const result = parseCsv(csv); + expect(result).toEqual([ + ['name', 'age'], + ['Alice', '30'], + ['Bob', '25'], + ]); + }); + + it('should handle quoted fields', () => { + const csv = 'name,description\nAlice,"Hello, World"\nBob,"Line1"'; + const result = parseCsv(csv); + expect(result).toEqual([ + ['name', 'description'], + ['Alice', 'Hello, World'], + ['Bob', 'Line1'], + ]); + }); + + it('should handle escaped quotes (double quotes)', () => { + const csv = 'name,value\nAlice,"He said ""hello"""\nBob,simple'; + const result = parseCsv(csv); + expect(result).toEqual([ + ['name', 'value'], + ['Alice', 'He said "hello"'], + ['Bob', 'simple'], + ]); + }); + + it('should handle CRLF line endings', () => { + const csv = 'name,age\r\nAlice,30\r\nBob,25'; + const result = parseCsv(csv); + expect(result).toEqual([ + ['name', 'age'], + ['Alice', '30'], + ['Bob', '25'], + ]); + }); + + it('should handle bare CR line endings', () => { + const csv = 'name,age\rAlice,30\rBob,25'; + const result = parseCsv(csv); + expect(result).toEqual([ + ['name', 'age'], + ['Alice', '30'], + ['Bob', '25'], + ]); + }); + + it('should handle empty fields', () => { + const csv = 'a,b,c\n1,,3\n,,'; + const result = parseCsv(csv); + expect(result).toEqual([ + ['a', 'b', 'c'], + ['1', '', '3'], + ['', '', ''], + ]); + }); + + it('should handle newlines within quoted fields', () => { + const csv = 'name,bio\nAlice,"Line1\nLine2"\nBob,simple'; + const result = parseCsv(csv); + expect(result).toEqual([ + ['name', 'bio'], + ['Alice', 'Line1\nLine2'], + ['Bob', 'simple'], + ]); + }); +}); + +describe('CsvDataSource', () => { + function createTempCsv(content: string): string { + const dir = join(tmpdir(), `takt-csv-test-${randomUUID()}`); + mkdirSync(dir, { recursive: true }); + const filePath = join(dir, 'test.csv'); + writeFileSync(filePath, content, 'utf-8'); + return filePath; + } + + it('should read batches with batch_size 1', async () => { + const filePath = createTempCsv('name,age\nAlice,30\nBob,25\nCharlie,35'); + const source = new CsvDataSource(filePath); + const batches = await source.readBatches(1); + + expect(batches).toHaveLength(3); + expect(batches[0]!.rows).toEqual([{ name: 'Alice', age: '30' }]); + expect(batches[0]!.batchIndex).toBe(0); + expect(batches[0]!.totalBatches).toBe(3); + expect(batches[1]!.rows).toEqual([{ name: 'Bob', age: '25' }]); + expect(batches[2]!.rows).toEqual([{ name: 'Charlie', age: '35' }]); + }); + + it('should read batches with batch_size 2', async () => { + const filePath = createTempCsv('name,age\nAlice,30\nBob,25\nCharlie,35'); + const source = new CsvDataSource(filePath); + const batches = await source.readBatches(2); + + expect(batches).toHaveLength(2); + expect(batches[0]!.rows).toEqual([ + { name: 'Alice', age: '30' }, + { name: 'Bob', age: '25' }, + ]); + expect(batches[0]!.totalBatches).toBe(2); + expect(batches[1]!.rows).toEqual([ + { name: 'Charlie', age: '35' }, + ]); + }); + + it('should throw when CSV has no data rows', async () => { + const filePath = createTempCsv('name,age'); + const source = new CsvDataSource(filePath); + await expect(source.readBatches(1)).rejects.toThrow('CSV file has no data rows'); + }); + + it('should handle missing columns by returning empty string', async () => { + const filePath = createTempCsv('a,b,c\n1,2\n3'); + const source = new CsvDataSource(filePath); + const batches = await source.readBatches(1); + + expect(batches[0]!.rows).toEqual([{ a: '1', b: '2', c: '' }]); + expect(batches[1]!.rows).toEqual([{ a: '3', b: '', c: '' }]); + }); +}); diff --git a/src/__tests__/arpeggio-data-source-factory.test.ts b/src/__tests__/arpeggio-data-source-factory.test.ts new file mode 100644 index 0000000..b3bc896 --- /dev/null +++ b/src/__tests__/arpeggio-data-source-factory.test.ts @@ -0,0 +1,50 @@ +/** + * Tests for the arpeggio data source factory. + * + * Covers: + * - Built-in 'csv' source type returns CsvDataSource + * - Custom module: valid default export returns a data source + * - Custom module: non-function default export throws + * - Custom module: missing default export throws + */ + +import { describe, it, expect } from 'vitest'; +import { createDataSource } from '../core/piece/arpeggio/data-source-factory.js'; +import { CsvDataSource } from '../core/piece/arpeggio/csv-data-source.js'; + +describe('createDataSource', () => { + it('should return a CsvDataSource for built-in "csv" type', async () => { + const source = await createDataSource('csv', '/path/to/data.csv'); + expect(source).toBeInstanceOf(CsvDataSource); + }); + + it('should return a valid data source from a custom module with correct default export', async () => { + const tempModulePath = new URL( + 'data:text/javascript,export default function(path) { return { readBatches: async () => [] }; }' + ).href; + + const source = await createDataSource(tempModulePath, '/some/path'); + expect(source).toBeDefined(); + expect(typeof source.readBatches).toBe('function'); + }); + + it('should throw when custom module does not export a default function', async () => { + const tempModulePath = new URL( + 'data:text/javascript,export default "not-a-function"' + ).href; + + await expect(createDataSource(tempModulePath, '/some/path')).rejects.toThrow( + /must export a default factory function/ + ); + }); + + it('should throw when custom module has no default export', async () => { + const tempModulePath = new URL( + 'data:text/javascript,export const foo = 42' + ).href; + + await expect(createDataSource(tempModulePath, '/some/path')).rejects.toThrow( + /must export a default factory function/ + ); + }); +}); diff --git a/src/__tests__/arpeggio-merge.test.ts b/src/__tests__/arpeggio-merge.test.ts new file mode 100644 index 0000000..a27f8c5 --- /dev/null +++ b/src/__tests__/arpeggio-merge.test.ts @@ -0,0 +1,108 @@ +/** + * Tests for arpeggio merge processing. + */ + +import { describe, it, expect } from 'vitest'; +import { buildMergeFn } from '../core/piece/arpeggio/merge.js'; +import type { ArpeggioMergeMovementConfig } from '../core/piece/arpeggio/types.js'; +import type { BatchResult } from '../core/piece/arpeggio/types.js'; + +function makeResult(batchIndex: number, content: string, success = true): BatchResult { + return { batchIndex, content, success }; +} + +function makeFailedResult(batchIndex: number, error: string): BatchResult { + return { batchIndex, content: '', success: false, error }; +} + +describe('buildMergeFn', () => { + describe('concat strategy', () => { + it('should concatenate results with default separator (newline)', async () => { + const config: ArpeggioMergeMovementConfig = { strategy: 'concat' }; + const mergeFn = await buildMergeFn(config); + const results = [ + makeResult(0, 'Result A'), + makeResult(1, 'Result B'), + makeResult(2, 'Result C'), + ]; + expect(mergeFn(results)).toBe('Result A\nResult B\nResult C'); + }); + + it('should concatenate results with custom separator', async () => { + const config: ArpeggioMergeMovementConfig = { strategy: 'concat', separator: '\n---\n' }; + const mergeFn = await buildMergeFn(config); + const results = [ + makeResult(0, 'A'), + makeResult(1, 'B'), + ]; + expect(mergeFn(results)).toBe('A\n---\nB'); + }); + + it('should sort results by batch index', async () => { + const config: ArpeggioMergeMovementConfig = { strategy: 'concat' }; + const mergeFn = await buildMergeFn(config); + const results = [ + makeResult(2, 'C'), + makeResult(0, 'A'), + makeResult(1, 'B'), + ]; + expect(mergeFn(results)).toBe('A\nB\nC'); + }); + + it('should filter out failed results', async () => { + const config: ArpeggioMergeMovementConfig = { strategy: 'concat' }; + const mergeFn = await buildMergeFn(config); + const results = [ + makeResult(0, 'A'), + makeFailedResult(1, 'oops'), + makeResult(2, 'C'), + ]; + expect(mergeFn(results)).toBe('A\nC'); + }); + + it('should return empty string when all results failed', async () => { + const config: ArpeggioMergeMovementConfig = { strategy: 'concat' }; + const mergeFn = await buildMergeFn(config); + const results = [ + makeFailedResult(0, 'error1'), + makeFailedResult(1, 'error2'), + ]; + expect(mergeFn(results)).toBe(''); + }); + }); + + describe('custom strategy with inline_js', () => { + it('should execute inline JS merge function', async () => { + const config: ArpeggioMergeMovementConfig = { + strategy: 'custom', + inlineJs: 'return results.filter(r => r.success).map(r => r.content.toUpperCase()).join(", ");', + }; + const mergeFn = await buildMergeFn(config); + const results = [ + makeResult(0, 'hello'), + makeResult(1, 'world'), + ]; + expect(mergeFn(results)).toBe('HELLO, WORLD'); + }); + + it('should throw when inline JS returns non-string', async () => { + const config: ArpeggioMergeMovementConfig = { + strategy: 'custom', + inlineJs: 'return 42;', + }; + const mergeFn = await buildMergeFn(config); + expect(() => mergeFn([makeResult(0, 'test')])).toThrow( + 'Inline JS merge function must return a string, got number' + ); + }); + }); + + describe('custom strategy validation', () => { + it('should throw when custom strategy has neither inline_js nor file', async () => { + const config: ArpeggioMergeMovementConfig = { strategy: 'custom' }; + await expect(buildMergeFn(config)).rejects.toThrow( + 'Custom merge strategy requires either inline_js or file path' + ); + }); + }); +}); diff --git a/src/__tests__/arpeggio-schema.test.ts b/src/__tests__/arpeggio-schema.test.ts new file mode 100644 index 0000000..e121945 --- /dev/null +++ b/src/__tests__/arpeggio-schema.test.ts @@ -0,0 +1,332 @@ +/** + * Tests for Arpeggio-related Zod schemas. + * + * Covers: + * - ArpeggioMergeRawSchema cross-validation (.refine()) + * - ArpeggioConfigRawSchema required fields and defaults + * - PieceMovementRawSchema with arpeggio field + */ + +import { describe, it, expect } from 'vitest'; +import { + ArpeggioMergeRawSchema, + ArpeggioConfigRawSchema, + PieceMovementRawSchema, +} from '../core/models/index.js'; + +describe('ArpeggioMergeRawSchema', () => { + it('should accept concat strategy without inline_js or file', () => { + const result = ArpeggioMergeRawSchema.safeParse({ + strategy: 'concat', + }); + expect(result.success).toBe(true); + }); + + it('should accept concat strategy with separator', () => { + const result = ArpeggioMergeRawSchema.safeParse({ + strategy: 'concat', + separator: '\n---\n', + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.separator).toBe('\n---\n'); + } + }); + + it('should default strategy to concat when omitted', () => { + const result = ArpeggioMergeRawSchema.safeParse({}); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.strategy).toBe('concat'); + } + }); + + it('should accept custom strategy with inline_js', () => { + const result = ArpeggioMergeRawSchema.safeParse({ + strategy: 'custom', + inline_js: 'return results.map(r => r.content).join(",");', + }); + expect(result.success).toBe(true); + }); + + it('should accept custom strategy with file', () => { + const result = ArpeggioMergeRawSchema.safeParse({ + strategy: 'custom', + file: './merge.js', + }); + expect(result.success).toBe(true); + }); + + it('should reject custom strategy without inline_js or file', () => { + const result = ArpeggioMergeRawSchema.safeParse({ + strategy: 'custom', + }); + expect(result.success).toBe(false); + }); + + it('should reject concat strategy with inline_js', () => { + const result = ArpeggioMergeRawSchema.safeParse({ + strategy: 'concat', + inline_js: 'return "hello";', + }); + expect(result.success).toBe(false); + }); + + it('should reject concat strategy with file', () => { + const result = ArpeggioMergeRawSchema.safeParse({ + strategy: 'concat', + file: './merge.js', + }); + expect(result.success).toBe(false); + }); + + it('should reject invalid strategy value', () => { + const result = ArpeggioMergeRawSchema.safeParse({ + strategy: 'invalid', + }); + expect(result.success).toBe(false); + }); +}); + +describe('ArpeggioConfigRawSchema', () => { + const validConfig = { + source: 'csv', + source_path: './data.csv', + template: './template.md', + }; + + it('should accept a valid minimal config', () => { + const result = ArpeggioConfigRawSchema.safeParse(validConfig); + expect(result.success).toBe(true); + }); + + it('should apply default values for optional fields', () => { + const result = ArpeggioConfigRawSchema.safeParse(validConfig); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.batch_size).toBe(1); + expect(result.data.concurrency).toBe(1); + expect(result.data.max_retries).toBe(2); + expect(result.data.retry_delay_ms).toBe(1000); + } + }); + + it('should accept explicit values overriding defaults', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + batch_size: 5, + concurrency: 3, + max_retries: 4, + retry_delay_ms: 2000, + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.batch_size).toBe(5); + expect(result.data.concurrency).toBe(3); + expect(result.data.max_retries).toBe(4); + expect(result.data.retry_delay_ms).toBe(2000); + } + }); + + it('should accept config with merge field', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + merge: { strategy: 'concat', separator: '---' }, + }); + expect(result.success).toBe(true); + }); + + it('should accept config with output_path', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + output_path: './output.txt', + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.output_path).toBe('./output.txt'); + } + }); + + it('should reject when source is empty', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + source: '', + }); + expect(result.success).toBe(false); + }); + + it('should reject when source is missing', () => { + const { source: _, ...noSource } = validConfig; + const result = ArpeggioConfigRawSchema.safeParse(noSource); + expect(result.success).toBe(false); + }); + + it('should reject when source_path is empty', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + source_path: '', + }); + expect(result.success).toBe(false); + }); + + it('should reject when source_path is missing', () => { + const { source_path: _, ...noSourcePath } = validConfig; + const result = ArpeggioConfigRawSchema.safeParse(noSourcePath); + expect(result.success).toBe(false); + }); + + it('should reject when template is empty', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + template: '', + }); + expect(result.success).toBe(false); + }); + + it('should reject when template is missing', () => { + const { template: _, ...noTemplate } = validConfig; + const result = ArpeggioConfigRawSchema.safeParse(noTemplate); + expect(result.success).toBe(false); + }); + + it('should reject batch_size of 0', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + batch_size: 0, + }); + expect(result.success).toBe(false); + }); + + it('should reject negative batch_size', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + batch_size: -1, + }); + expect(result.success).toBe(false); + }); + + it('should reject concurrency of 0', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + concurrency: 0, + }); + expect(result.success).toBe(false); + }); + + it('should reject negative concurrency', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + concurrency: -1, + }); + expect(result.success).toBe(false); + }); + + it('should reject negative max_retries', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + max_retries: -1, + }); + expect(result.success).toBe(false); + }); + + it('should accept max_retries of 0', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + max_retries: 0, + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.max_retries).toBe(0); + } + }); + + it('should reject non-integer batch_size', () => { + const result = ArpeggioConfigRawSchema.safeParse({ + ...validConfig, + batch_size: 1.5, + }); + expect(result.success).toBe(false); + }); +}); + +describe('PieceMovementRawSchema with arpeggio', () => { + it('should accept a movement with arpeggio config', () => { + const raw = { + name: 'batch-process', + arpeggio: { + source: 'csv', + source_path: './data.csv', + template: './prompt.md', + }, + }; + + const result = PieceMovementRawSchema.safeParse(raw); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.arpeggio).toBeDefined(); + expect(result.data.arpeggio!.source).toBe('csv'); + } + }); + + it('should accept a movement with arpeggio and rules', () => { + const raw = { + name: 'batch-process', + arpeggio: { + source: 'csv', + source_path: './data.csv', + template: './prompt.md', + batch_size: 2, + concurrency: 3, + }, + rules: [ + { condition: 'All processed', next: 'COMPLETE' }, + { condition: 'Errors found', next: 'fix' }, + ], + }; + + const result = PieceMovementRawSchema.safeParse(raw); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.arpeggio!.batch_size).toBe(2); + expect(result.data.arpeggio!.concurrency).toBe(3); + expect(result.data.rules).toHaveLength(2); + } + }); + + it('should accept a movement without arpeggio (normal movement)', () => { + const raw = { + name: 'normal-step', + persona: 'coder.md', + instruction_template: 'Do work', + }; + + const result = PieceMovementRawSchema.safeParse(raw); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.arpeggio).toBeUndefined(); + } + }); + + it('should accept a movement with arpeggio including custom merge', () => { + const raw = { + name: 'custom-merge-step', + arpeggio: { + source: 'csv', + source_path: './data.csv', + template: './prompt.md', + merge: { + strategy: 'custom', + inline_js: 'return results.map(r => r.content).join(", ");', + }, + output_path: './output.txt', + }, + }; + + const result = PieceMovementRawSchema.safeParse(raw); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.arpeggio!.merge).toBeDefined(); + expect(result.data.arpeggio!.output_path).toBe('./output.txt'); + } + }); +}); diff --git a/src/__tests__/arpeggio-template.test.ts b/src/__tests__/arpeggio-template.test.ts new file mode 100644 index 0000000..1fcc8d3 --- /dev/null +++ b/src/__tests__/arpeggio-template.test.ts @@ -0,0 +1,83 @@ +/** + * Tests for arpeggio template expansion. + */ + +import { describe, it, expect } from 'vitest'; +import { expandTemplate } from '../core/piece/arpeggio/template.js'; +import type { DataBatch } from '../core/piece/arpeggio/types.js'; + +function makeBatch(rows: Record[], batchIndex = 0, totalBatches = 1): DataBatch { + return { rows, batchIndex, totalBatches }; +} + +describe('expandTemplate', () => { + it('should expand {line:1} with formatted row data', () => { + const batch = makeBatch([{ name: 'Alice', age: '30' }]); + const result = expandTemplate('Process this: {line:1}', batch); + expect(result).toBe('Process this: name: Alice\nage: 30'); + }); + + it('should expand {line:1} and {line:2} for multi-row batches', () => { + const batch = makeBatch([ + { name: 'Alice', age: '30' }, + { name: 'Bob', age: '25' }, + ]); + const result = expandTemplate('Row 1: {line:1}\nRow 2: {line:2}', batch); + expect(result).toBe('Row 1: name: Alice\nage: 30\nRow 2: name: Bob\nage: 25'); + }); + + it('should expand {col:N:name} with specific column values', () => { + const batch = makeBatch([{ name: 'Alice', age: '30', city: 'Tokyo' }]); + const result = expandTemplate('Name: {col:1:name}, City: {col:1:city}', batch); + expect(result).toBe('Name: Alice, City: Tokyo'); + }); + + it('should expand {batch_index} and {total_batches}', () => { + const batch = makeBatch([{ name: 'Alice' }], 2, 5); + const result = expandTemplate('Batch {batch_index} of {total_batches}', batch); + expect(result).toBe('Batch 2 of 5'); + }); + + it('should expand all placeholder types in a single template', () => { + const batch = makeBatch([ + { name: 'Alice', role: 'dev' }, + { name: 'Bob', role: 'pm' }, + ], 0, 3); + const template = 'Batch {batch_index}/{total_batches}\nFirst: {col:1:name}\nSecond: {line:2}'; + const result = expandTemplate(template, batch); + expect(result).toBe('Batch 0/3\nFirst: Alice\nSecond: name: Bob\nrole: pm'); + }); + + it('should throw when {line:N} references out-of-range row', () => { + const batch = makeBatch([{ name: 'Alice' }]); + expect(() => expandTemplate('{line:2}', batch)).toThrow( + 'Template placeholder {line:2} references row 2 but batch has 1 rows' + ); + }); + + it('should throw when {col:N:name} references out-of-range row', () => { + const batch = makeBatch([{ name: 'Alice' }]); + expect(() => expandTemplate('{col:2:name}', batch)).toThrow( + 'Template placeholder {col:2:name} references row 2 but batch has 1 rows' + ); + }); + + it('should throw when {col:N:name} references unknown column', () => { + const batch = makeBatch([{ name: 'Alice' }]); + expect(() => expandTemplate('{col:1:missing}', batch)).toThrow( + 'Template placeholder {col:1:missing} references unknown column "missing"' + ); + }); + + it('should handle templates with no placeholders', () => { + const batch = makeBatch([{ name: 'Alice' }]); + const result = expandTemplate('No placeholders here', batch); + expect(result).toBe('No placeholders here'); + }); + + it('should handle multiple occurrences of the same placeholder', () => { + const batch = makeBatch([{ name: 'Alice' }], 1, 3); + const result = expandTemplate('{batch_index} and {batch_index}', batch); + expect(result).toBe('1 and 1'); + }); +}); diff --git a/src/__tests__/engine-arpeggio.test.ts b/src/__tests__/engine-arpeggio.test.ts new file mode 100644 index 0000000..6b5618c --- /dev/null +++ b/src/__tests__/engine-arpeggio.test.ts @@ -0,0 +1,275 @@ +/** + * Integration tests for arpeggio movement execution via PieceEngine. + * + * Tests the full pipeline: CSV → template expansion → LLM → merge → rule evaluation. + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { writeFileSync, mkdirSync } from 'node:fs'; +import { join } from 'node:path'; + +// Mock external dependencies before importing +vi.mock('../agents/runner.js', () => ({ + runAgent: vi.fn(), +})); + +vi.mock('../core/piece/evaluation/index.js', () => ({ + detectMatchedRule: vi.fn(), + evaluateAggregateConditions: vi.fn(), +})); + +vi.mock('../core/piece/phase-runner.js', () => ({ + needsStatusJudgmentPhase: vi.fn().mockReturnValue(false), + runReportPhase: vi.fn().mockResolvedValue(undefined), + runStatusJudgmentPhase: vi.fn().mockResolvedValue(''), +})); + +vi.mock('../shared/utils/index.js', async () => { + const actual = await vi.importActual('../shared/utils/index.js'); + return { + ...actual, + generateReportDir: vi.fn().mockReturnValue('test-report-dir'), + }; +}); + +import { runAgent } from '../agents/runner.js'; +import { detectMatchedRule } from '../core/piece/evaluation/index.js'; +import { PieceEngine } from '../core/piece/engine/PieceEngine.js'; +import type { PieceConfig, PieceMovement, AgentResponse, ArpeggioMovementConfig } from '../core/models/index.js'; +import type { PieceEngineOptions } from '../core/piece/types.js'; +import { + makeResponse, + makeMovement, + makeRule, + createTestTmpDir, + cleanupPieceEngine, +} from './engine-test-helpers.js'; +import type { RuleMatch } from '../core/piece/index.js'; + +function createArpeggioTestDir(): { tmpDir: string; csvPath: string; templatePath: string } { + const tmpDir = createTestTmpDir(); + const csvPath = join(tmpDir, 'data.csv'); + const templatePath = join(tmpDir, 'template.md'); + + writeFileSync(csvPath, 'name,task\nAlice,review\nBob,implement\nCharlie,test', 'utf-8'); + writeFileSync(templatePath, 'Process {line:1}', 'utf-8'); + + return { tmpDir, csvPath, templatePath }; +} + +function createArpeggioConfig(csvPath: string, templatePath: string, overrides: Partial = {}): ArpeggioMovementConfig { + return { + source: 'csv', + sourcePath: csvPath, + batchSize: 1, + concurrency: 1, + templatePath, + merge: { strategy: 'concat' }, + maxRetries: 0, + retryDelayMs: 0, + ...overrides, + }; +} + +function buildArpeggioPieceConfig(arpeggioConfig: ArpeggioMovementConfig, tmpDir: string): PieceConfig { + return { + name: 'test-arpeggio', + description: 'Test arpeggio piece', + maxIterations: 10, + initialMovement: 'process', + movements: [ + { + ...makeMovement('process', { + rules: [ + makeRule('Processing complete', 'COMPLETE'), + makeRule('Processing failed', 'ABORT'), + ], + }), + arpeggio: arpeggioConfig, + }, + ], + }; +} + +function createEngineOptions(tmpDir: string): PieceEngineOptions { + return { + projectCwd: tmpDir, + detectRuleIndex: () => 0, + callAiJudge: async () => 0, + }; +} + +describe('ArpeggioRunner integration', () => { + let engine: PieceEngine | undefined; + + beforeEach(() => { + vi.resetAllMocks(); + vi.mocked(detectMatchedRule).mockResolvedValue(undefined); + }); + + afterEach(() => { + if (engine) { + cleanupPieceEngine(engine); + engine = undefined; + } + }); + + it('should process CSV data and merge results', async () => { + const { tmpDir, csvPath, templatePath } = createArpeggioTestDir(); + const arpeggioConfig = createArpeggioConfig(csvPath, templatePath); + const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir); + + // Mock agent to return batch-specific responses + const mockAgent = vi.mocked(runAgent); + mockAgent + .mockResolvedValueOnce(makeResponse({ content: 'Processed Alice' })) + .mockResolvedValueOnce(makeResponse({ content: 'Processed Bob' })) + .mockResolvedValueOnce(makeResponse({ content: 'Processed Charlie' })); + + // Mock rule detection for the merged result + vi.mocked(detectMatchedRule).mockResolvedValueOnce({ + index: 0, + method: 'phase1_tag', + }); + + engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir)); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + expect(mockAgent).toHaveBeenCalledTimes(3); + + // Verify merged content in movement output + const output = state.movementOutputs.get('process'); + expect(output).toBeDefined(); + expect(output!.content).toBe('Processed Alice\nProcessed Bob\nProcessed Charlie'); + }); + + it('should handle batch_size > 1', async () => { + const tmpDir = createTestTmpDir(); + const csvPath = join(tmpDir, 'data.csv'); + const templatePath = join(tmpDir, 'batch-template.md'); + // 4 rows so batch_size=2 gives exactly 2 batches with 2 rows each + writeFileSync(csvPath, 'name,task\nAlice,review\nBob,implement\nCharlie,test\nDave,deploy', 'utf-8'); + writeFileSync(templatePath, 'Row1: {line:1}\nRow2: {line:2}', 'utf-8'); + + const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { batchSize: 2 }); + const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir); + + const mockAgent = vi.mocked(runAgent); + mockAgent + .mockResolvedValueOnce(makeResponse({ content: 'Batch 0 result' })) + .mockResolvedValueOnce(makeResponse({ content: 'Batch 1 result' })); + + vi.mocked(detectMatchedRule).mockResolvedValueOnce({ + index: 0, + method: 'phase1_tag', + }); + + engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir)); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + // 4 rows / batch_size 2 = 2 batches + expect(mockAgent).toHaveBeenCalledTimes(2); + }); + + it('should abort when a batch fails and retries are exhausted', async () => { + const { tmpDir, csvPath, templatePath } = createArpeggioTestDir(); + const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { + maxRetries: 1, + retryDelayMs: 0, + }); + const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir); + + const mockAgent = vi.mocked(runAgent); + // First batch succeeds + mockAgent.mockResolvedValueOnce(makeResponse({ content: 'OK' })); + // Second batch fails twice (initial + 1 retry) + mockAgent.mockResolvedValueOnce(makeResponse({ status: 'error', error: 'fail1' })); + mockAgent.mockResolvedValueOnce(makeResponse({ status: 'error', error: 'fail2' })); + // Third batch succeeds + mockAgent.mockResolvedValueOnce(makeResponse({ content: 'OK' })); + + engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir)); + const state = await engine.run(); + + expect(state.status).toBe('aborted'); + }); + + it('should write output file when output_path is configured', async () => { + const { tmpDir, csvPath, templatePath } = createArpeggioTestDir(); + const outputPath = join(tmpDir, 'output.txt'); + const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { outputPath }); + const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir); + + const mockAgent = vi.mocked(runAgent); + mockAgent + .mockResolvedValueOnce(makeResponse({ content: 'Result A' })) + .mockResolvedValueOnce(makeResponse({ content: 'Result B' })) + .mockResolvedValueOnce(makeResponse({ content: 'Result C' })); + + vi.mocked(detectMatchedRule).mockResolvedValueOnce({ + index: 0, + method: 'phase1_tag', + }); + + engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir)); + await engine.run(); + + const { readFileSync } = await import('node:fs'); + const outputContent = readFileSync(outputPath, 'utf-8'); + expect(outputContent).toBe('Result A\nResult B\nResult C'); + }); + + it('should handle concurrency > 1', async () => { + const { tmpDir, csvPath, templatePath } = createArpeggioTestDir(); + const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { concurrency: 3 }); + const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir); + + const mockAgent = vi.mocked(runAgent); + mockAgent + .mockResolvedValueOnce(makeResponse({ content: 'A' })) + .mockResolvedValueOnce(makeResponse({ content: 'B' })) + .mockResolvedValueOnce(makeResponse({ content: 'C' })); + + vi.mocked(detectMatchedRule).mockResolvedValueOnce({ + index: 0, + method: 'phase1_tag', + }); + + engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir)); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + expect(mockAgent).toHaveBeenCalledTimes(3); + }); + + it('should use custom merge function when configured', async () => { + const { tmpDir, csvPath, templatePath } = createArpeggioTestDir(); + const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { + merge: { + strategy: 'custom', + inlineJs: 'return results.filter(r => r.success).map(r => r.content).join(" | ");', + }, + }); + const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir); + + const mockAgent = vi.mocked(runAgent); + mockAgent + .mockResolvedValueOnce(makeResponse({ content: 'X' })) + .mockResolvedValueOnce(makeResponse({ content: 'Y' })) + .mockResolvedValueOnce(makeResponse({ content: 'Z' })); + + vi.mocked(detectMatchedRule).mockResolvedValueOnce({ + index: 0, + method: 'phase1_tag', + }); + + engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir)); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + const output = state.movementOutputs.get('process'); + expect(output!.content).toBe('X | Y | Z'); + }); +}); diff --git a/src/core/models/index.ts b/src/core/models/index.ts index 166becd..bf9b5ef 100644 --- a/src/core/models/index.ts +++ b/src/core/models/index.ts @@ -12,6 +12,8 @@ export type { SessionState, PieceRule, PieceMovement, + ArpeggioMovementConfig, + ArpeggioMergeMovementConfig, LoopDetectionConfig, LoopMonitorConfig, LoopMonitorJudge, diff --git a/src/core/models/piece-types.ts b/src/core/models/piece-types.ts index aec8147..f1d4d42 100644 --- a/src/core/models/piece-types.ts +++ b/src/core/models/piece-types.ts @@ -114,12 +114,48 @@ export interface PieceMovement { passPreviousResponse: boolean; /** Sub-movements to execute in parallel. When set, this movement runs all sub-movements concurrently. */ parallel?: PieceMovement[]; + /** Arpeggio configuration for data-driven batch processing. When set, this movement reads from a data source, expands templates, and calls LLM per batch. */ + arpeggio?: ArpeggioMovementConfig; /** Resolved policy content strings (from piece-level policies map, resolved at parse time) */ policyContents?: string[]; /** Resolved knowledge content strings (from piece-level knowledge map, resolved at parse time) */ knowledgeContents?: string[]; } +/** Merge configuration for arpeggio results */ +export interface ArpeggioMergeMovementConfig { + /** Merge strategy: 'concat' (default), 'custom' */ + readonly strategy: 'concat' | 'custom'; + /** Inline JS merge function body (for custom strategy) */ + readonly inlineJs?: string; + /** Path to external JS merge file (for custom strategy, resolved to absolute) */ + readonly filePath?: string; + /** Separator for concat strategy (default: '\n') */ + readonly separator?: string; +} + +/** Arpeggio configuration for data-driven batch processing movements */ +export interface ArpeggioMovementConfig { + /** Data source type (e.g., 'csv') */ + readonly source: string; + /** Path to the data source file (resolved to absolute) */ + readonly sourcePath: string; + /** Number of rows per batch (default: 1) */ + readonly batchSize: number; + /** Number of concurrent LLM calls (default: 1) */ + readonly concurrency: number; + /** Path to prompt template file (resolved to absolute) */ + readonly templatePath: string; + /** Merge configuration */ + readonly merge: ArpeggioMergeMovementConfig; + /** Maximum retry attempts per batch (default: 2) */ + readonly maxRetries: number; + /** Delay between retries in ms (default: 1000) */ + readonly retryDelayMs: number; + /** Optional output file path (resolved to absolute) */ + readonly outputPath?: string; +} + /** Loop detection configuration */ export interface LoopDetectionConfig { /** Maximum consecutive runs of the same step before triggering (default: 10) */ diff --git a/src/core/models/schemas.ts b/src/core/models/schemas.ts index df0d6b8..8345a47 100644 --- a/src/core/models/schemas.ts +++ b/src/core/models/schemas.ts @@ -130,6 +130,46 @@ export const PieceRuleSchema = z.object({ interactive_only: z.boolean().optional(), }); +/** Arpeggio merge configuration schema */ +export const ArpeggioMergeRawSchema = z.object({ + /** Merge strategy: 'concat' or 'custom' */ + strategy: z.enum(['concat', 'custom']).optional().default('concat'), + /** Inline JS function body for custom merge */ + inline_js: z.string().optional(), + /** External JS file path for custom merge */ + file: z.string().optional(), + /** Separator for concat strategy */ + separator: z.string().optional(), +}).refine( + (data) => data.strategy !== 'custom' || data.inline_js != null || data.file != null, + { message: "Custom merge strategy requires either 'inline_js' or 'file'" } +).refine( + (data) => data.strategy !== 'concat' || (data.inline_js == null && data.file == null), + { message: "Concat merge strategy does not accept 'inline_js' or 'file'" } +); + +/** Arpeggio configuration schema for data-driven batch processing */ +export const ArpeggioConfigRawSchema = z.object({ + /** Data source type (e.g., 'csv') */ + source: z.string().min(1), + /** Path to the data source file */ + source_path: z.string().min(1), + /** Number of rows per batch (default: 1) */ + batch_size: z.number().int().positive().optional().default(1), + /** Number of concurrent LLM calls (default: 1) */ + concurrency: z.number().int().positive().optional().default(1), + /** Path to prompt template file */ + template: z.string().min(1), + /** Merge configuration */ + merge: ArpeggioMergeRawSchema.optional(), + /** Maximum retry attempts per batch (default: 2) */ + max_retries: z.number().int().min(0).optional().default(2), + /** Delay between retries in ms (default: 1000) */ + retry_delay_ms: z.number().int().min(0).optional().default(1000), + /** Optional output file path */ + output_path: z.string().optional(), +}); + /** Sub-movement schema for parallel execution */ export const ParallelSubMovementRawSchema = z.object({ name: z.string().min(1), @@ -190,6 +230,8 @@ export const PieceMovementRawSchema = z.object({ pass_previous_response: z.boolean().optional().default(true), /** Sub-movements to execute in parallel */ parallel: z.array(ParallelSubMovementRawSchema).optional(), + /** Arpeggio configuration for data-driven batch processing */ + arpeggio: ArpeggioConfigRawSchema.optional(), }); /** Loop monitor rule schema */ diff --git a/src/core/models/types.ts b/src/core/models/types.ts index b13a24a..a578752 100644 --- a/src/core/models/types.ts +++ b/src/core/models/types.ts @@ -31,6 +31,8 @@ export type { OutputContractEntry, McpServerConfig, PieceMovement, + ArpeggioMovementConfig, + ArpeggioMergeMovementConfig, LoopDetectionConfig, LoopMonitorConfig, LoopMonitorJudge, diff --git a/src/core/piece/arpeggio/csv-data-source.ts b/src/core/piece/arpeggio/csv-data-source.ts new file mode 100644 index 0000000..ca17755 --- /dev/null +++ b/src/core/piece/arpeggio/csv-data-source.ts @@ -0,0 +1,133 @@ +/** + * CSV data source for arpeggio movements. + * + * Reads CSV files and returns data in batches for template expansion. + * Handles quoted fields, escaped quotes, and various line endings. + */ + +import { readFileSync } from 'node:fs'; +import type { ArpeggioDataSource, DataBatch, DataRow } from './types.js'; + +/** Parse a CSV string into an array of string arrays (rows of fields) */ +export function parseCsv(content: string): string[][] { + const rows: string[][] = []; + let currentRow: string[] = []; + let currentField = ''; + let inQuotes = false; + let i = 0; + + while (i < content.length) { + const char = content[i]!; + + if (inQuotes) { + if (char === '"') { + // Check for escaped quote ("") + if (i + 1 < content.length && content[i + 1] === '"') { + currentField += '"'; + i += 2; + continue; + } + // End of quoted field + inQuotes = false; + i++; + continue; + } + currentField += char; + i++; + continue; + } + + if (char === '"' && currentField.length === 0) { + inQuotes = true; + i++; + continue; + } + + if (char === ',') { + currentRow.push(currentField); + currentField = ''; + i++; + continue; + } + + if (char === '\r') { + // Handle \r\n and bare \r + currentRow.push(currentField); + currentField = ''; + rows.push(currentRow); + currentRow = []; + if (i + 1 < content.length && content[i + 1] === '\n') { + i += 2; + } else { + i++; + } + continue; + } + + if (char === '\n') { + currentRow.push(currentField); + currentField = ''; + rows.push(currentRow); + currentRow = []; + i++; + continue; + } + + currentField += char; + i++; + } + + // Handle last field/row + if (currentField.length > 0 || currentRow.length > 0) { + currentRow.push(currentField); + rows.push(currentRow); + } + + return rows; +} + +/** Convert parsed CSV rows into DataRow objects using the header row */ +function rowsToDataRows(headers: readonly string[], dataRows: readonly string[][]): DataRow[] { + return dataRows.map((row) => { + const dataRow: DataRow = {}; + for (let col = 0; col < headers.length; col++) { + const header = headers[col]!; + dataRow[header] = row[col] ?? ''; + } + return dataRow; + }); +} + +/** Split an array into chunks of the given size */ +function chunk(array: readonly T[], size: number): T[][] { + const chunks: T[][] = []; + for (let i = 0; i < array.length; i += size) { + chunks.push(array.slice(i, i + size)); + } + return chunks; +} + +export class CsvDataSource implements ArpeggioDataSource { + constructor(private readonly filePath: string) {} + + async readBatches(batchSize: number): Promise { + const content = readFileSync(this.filePath, 'utf-8'); + const parsed = parseCsv(content); + + if (parsed.length < 2) { + throw new Error(`CSV file has no data rows: ${this.filePath}`); + } + + const headers = parsed[0]!; + const dataRowArrays = parsed.slice(1); + const dataRows = rowsToDataRows(headers, dataRowArrays); + const chunks = chunk(dataRows, batchSize); + const totalBatches = chunks.length; + + return chunks.map((rows, index) => ({ + rows, + batchIndex: index, + totalBatches, + })); + } +} diff --git a/src/core/piece/arpeggio/data-source-factory.ts b/src/core/piece/arpeggio/data-source-factory.ts new file mode 100644 index 0000000..3216161 --- /dev/null +++ b/src/core/piece/arpeggio/data-source-factory.ts @@ -0,0 +1,41 @@ +/** + * Factory for creating data source instances. + * + * Maps source type names to their implementations. + * Built-in: 'csv'. Users can extend with custom JS modules. + */ + +import type { ArpeggioDataSource } from './types.js'; +import { CsvDataSource } from './csv-data-source.js'; + +/** Built-in data source type mapping */ +const BUILTIN_SOURCES: Record ArpeggioDataSource> = { + csv: (path) => new CsvDataSource(path), +}; + +/** + * Create a data source instance by type and path. + * + * For built-in types ('csv'), uses the registered factory. + * For custom types, loads from the source type as a JS module path. + */ +export async function createDataSource( + sourceType: string, + sourcePath: string, +): Promise { + const builtinFactory = BUILTIN_SOURCES[sourceType]; + if (builtinFactory) { + return builtinFactory(sourcePath); + } + + // Custom data source: sourceType is a path to a JS module that exports a factory + const module = await import(sourceType) as { + default?: (path: string) => ArpeggioDataSource; + }; + if (typeof module.default !== 'function') { + throw new Error( + `Custom data source module "${sourceType}" must export a default factory function` + ); + } + return module.default(sourcePath); +} diff --git a/src/core/piece/arpeggio/merge.ts b/src/core/piece/arpeggio/merge.ts new file mode 100644 index 0000000..6636b6e --- /dev/null +++ b/src/core/piece/arpeggio/merge.ts @@ -0,0 +1,78 @@ +/** + * Merge processing for arpeggio batch results. + * + * Supports two merge strategies: + * - 'concat': Simple concatenation with configurable separator + * - 'custom': User-provided merge function (inline JS or external file) + */ + +import { writeFileSync } from 'node:fs'; +import type { ArpeggioMergeMovementConfig, BatchResult, MergeFn } from './types.js'; + +/** Create a concat merge function with the given separator */ +function createConcatMerge(separator: string): MergeFn { + return (results) => + results + .filter((r) => r.success) + .sort((a, b) => a.batchIndex - b.batchIndex) + .map((r) => r.content) + .join(separator); +} + +/** + * Create a merge function from inline JavaScript. + * + * The inline JS receives `results` as the function parameter (readonly BatchResult[]). + * It must return a string. + */ +function createInlineJsMerge(jsBody: string): MergeFn { + const fn = new Function('results', jsBody) as MergeFn; + return (results) => { + const output = fn(results); + if (typeof output !== 'string') { + throw new Error(`Inline JS merge function must return a string, got ${typeof output}`); + } + return output; + }; +} + +/** + * Create a merge function from an external JS file. + * + * The file must export a default function: (results: BatchResult[]) => string + */ +async function createFileMerge(filePath: string): Promise { + const module = await import(filePath) as { default?: MergeFn }; + if (typeof module.default !== 'function') { + throw new Error(`Merge file "${filePath}" must export a default function`); + } + return module.default; +} + +/** + * Build a merge function from the arpeggio merge configuration. + * + * For 'concat' strategy: returns a simple join function. + * For 'custom' strategy: loads from inline JS or external file. + */ +export async function buildMergeFn(config: ArpeggioMergeMovementConfig): Promise { + if (config.strategy === 'concat') { + return createConcatMerge(config.separator ?? '\n'); + } + + // Custom strategy + if (config.inlineJs) { + return createInlineJsMerge(config.inlineJs); + } + + if (config.filePath) { + return createFileMerge(config.filePath); + } + + throw new Error('Custom merge strategy requires either inline_js or file path'); +} + +/** Write merged output to a file if output_path is configured */ +export function writeMergedOutput(outputPath: string, content: string): void { + writeFileSync(outputPath, content, 'utf-8'); +} diff --git a/src/core/piece/arpeggio/template.ts b/src/core/piece/arpeggio/template.ts new file mode 100644 index 0000000..7e407a2 --- /dev/null +++ b/src/core/piece/arpeggio/template.ts @@ -0,0 +1,72 @@ +/** + * Template expansion for arpeggio movements. + * + * Expands placeholders in prompt templates using data from batches: + * - {line:N} — entire row N as "key: value" pairs (1-based) + * - {col:N:name} — specific column value from row N (1-based) + * - {batch_index} — 0-based batch index + * - {total_batches} — total number of batches + */ + +import { readFileSync } from 'node:fs'; +import type { DataBatch, DataRow } from './types.js'; + +/** Format a single data row as "key: value" lines */ +function formatRow(row: DataRow): string { + return Object.entries(row) + .map(([key, value]) => `${key}: ${value}`) + .join('\n'); +} + +/** + * Expand placeholders in a template string using batch data. + * + * Supported placeholders: + * - {line:N} — Row N (1-based) formatted as "key: value" lines + * - {col:N:name} — Column "name" from row N (1-based) + * - {batch_index} — 0-based batch index + * - {total_batches} — Total number of batches + */ +export function expandTemplate(template: string, batch: DataBatch): string { + let result = template; + + // Replace {batch_index} and {total_batches} + result = result.replace(/\{batch_index\}/g, String(batch.batchIndex)); + result = result.replace(/\{total_batches\}/g, String(batch.totalBatches)); + + // Replace {col:N:name} — must be done before {line:N} to avoid partial matches + result = result.replace(/\{col:(\d+):(\w+)\}/g, (_match, indexStr: string, colName: string) => { + const rowIndex = parseInt(indexStr, 10) - 1; + if (rowIndex < 0 || rowIndex >= batch.rows.length) { + throw new Error( + `Template placeholder {col:${indexStr}:${colName}} references row ${indexStr} but batch has ${batch.rows.length} rows` + ); + } + const row = batch.rows[rowIndex]!; + const value = row[colName]; + if (value === undefined) { + throw new Error( + `Template placeholder {col:${indexStr}:${colName}} references unknown column "${colName}"` + ); + } + return value; + }); + + // Replace {line:N} + result = result.replace(/\{line:(\d+)\}/g, (_match, indexStr: string) => { + const rowIndex = parseInt(indexStr, 10) - 1; + if (rowIndex < 0 || rowIndex >= batch.rows.length) { + throw new Error( + `Template placeholder {line:${indexStr}} references row ${indexStr} but batch has ${batch.rows.length} rows` + ); + } + return formatRow(batch.rows[rowIndex]!); + }); + + return result; +} + +/** Load a template file and return its content */ +export function loadTemplate(templatePath: string): string { + return readFileSync(templatePath, 'utf-8'); +} diff --git a/src/core/piece/arpeggio/types.ts b/src/core/piece/arpeggio/types.ts new file mode 100644 index 0000000..63bb000 --- /dev/null +++ b/src/core/piece/arpeggio/types.ts @@ -0,0 +1,46 @@ +/** + * Arpeggio movement internal type definitions. + * + * Configuration types (ArpeggioMovementConfig, ArpeggioMergeMovementConfig) + * live in models/piece-types.ts as part of PieceMovement. + * This file defines runtime types used internally by the arpeggio module. + */ + +export type { + ArpeggioMovementConfig, + ArpeggioMergeMovementConfig, +} from '../../models/piece-types.js'; + +/** A single row of data from a data source (column name → value) */ +export type DataRow = Record; + +/** A batch of rows read from a data source */ +export interface DataBatch { + /** The rows in this batch */ + readonly rows: readonly DataRow[]; + /** 0-based index of this batch in the overall data set */ + readonly batchIndex: number; + /** Total number of batches (known after full read) */ + readonly totalBatches: number; +} + +/** Interface for data source implementations */ +export interface ArpeggioDataSource { + /** Read all batches from the data source. Returns an array of DataBatch. */ + readBatches(batchSize: number): Promise; +} + +/** Result of a single LLM call for one batch */ +export interface BatchResult { + /** 0-based index of the batch */ + readonly batchIndex: number; + /** LLM response content */ + readonly content: string; + /** Whether this result was successful */ + readonly success: boolean; + /** Error message if failed */ + readonly error?: string; +} + +/** Merge function signature: takes all batch results, returns merged string */ +export type MergeFn = (results: readonly BatchResult[]) => string; diff --git a/src/core/piece/engine/ArpeggioRunner.ts b/src/core/piece/engine/ArpeggioRunner.ts new file mode 100644 index 0000000..2d9d014 --- /dev/null +++ b/src/core/piece/engine/ArpeggioRunner.ts @@ -0,0 +1,268 @@ +/** + * Executes arpeggio piece movements: data-driven batch processing. + * + * Reads data from a source, expands templates with batch data, + * calls LLM for each batch (with concurrency control), + * merges results, and returns an aggregated response. + */ + +import type { + PieceMovement, + PieceState, + AgentResponse, +} from '../../models/types.js'; +import type { ArpeggioMovementConfig, BatchResult, DataBatch } from '../arpeggio/types.js'; +import { createDataSource } from '../arpeggio/data-source-factory.js'; +import { loadTemplate, expandTemplate } from '../arpeggio/template.js'; +import { buildMergeFn, writeMergedOutput } from '../arpeggio/merge.js'; +import { runAgent, type RunAgentOptions } from '../../../agents/runner.js'; +import { detectMatchedRule } from '../evaluation/index.js'; +import { incrementMovementIteration } from './state-manager.js'; +import { createLogger } from '../../../shared/utils/index.js'; +import type { OptionsBuilder } from './OptionsBuilder.js'; +import type { PhaseName } from '../types.js'; + +const log = createLogger('arpeggio-runner'); + +export interface ArpeggioRunnerDeps { + readonly optionsBuilder: OptionsBuilder; + readonly getCwd: () => string; + readonly getInteractive: () => boolean; + readonly detectRuleIndex: (content: string, movementName: string) => number; + readonly callAiJudge: ( + agentOutput: string, + conditions: Array<{ index: number; text: string }>, + options: { cwd: string } + ) => Promise; + readonly onPhaseStart?: (step: PieceMovement, phase: 1 | 2 | 3, phaseName: PhaseName, instruction: string) => void; + readonly onPhaseComplete?: (step: PieceMovement, phase: 1 | 2 | 3, phaseName: PhaseName, content: string, status: string, error?: string) => void; +} + +/** + * Simple semaphore for controlling concurrency. + * Limits the number of concurrent async operations. + */ +class Semaphore { + private running = 0; + private readonly waiting: Array<() => void> = []; + + constructor(private readonly maxConcurrency: number) {} + + async acquire(): Promise { + if (this.running < this.maxConcurrency) { + this.running++; + return; + } + return new Promise((resolve) => { + this.waiting.push(resolve); + }); + } + + release(): void { + if (this.waiting.length > 0) { + const next = this.waiting.shift()!; + next(); + } else { + this.running--; + } + } +} + +/** Execute a single batch with retry logic */ +async function executeBatchWithRetry( + batch: DataBatch, + template: string, + persona: string | undefined, + agentOptions: RunAgentOptions, + maxRetries: number, + retryDelayMs: number, +): Promise { + const prompt = expandTemplate(template, batch); + let lastError: string | undefined; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + const response = await runAgent(persona, prompt, agentOptions); + if (response.status === 'error') { + lastError = response.error ?? response.content ?? 'Agent returned error status'; + log.info('Batch execution failed, retrying', { + batchIndex: batch.batchIndex, + attempt: attempt + 1, + maxRetries, + error: lastError, + }); + if (attempt < maxRetries) { + await delay(retryDelayMs); + continue; + } + return { + batchIndex: batch.batchIndex, + content: '', + success: false, + error: lastError, + }; + } + return { + batchIndex: batch.batchIndex, + content: response.content, + success: true, + }; + } catch (error) { + lastError = error instanceof Error ? error.message : String(error); + log.info('Batch execution threw, retrying', { + batchIndex: batch.batchIndex, + attempt: attempt + 1, + maxRetries, + error: lastError, + }); + if (attempt < maxRetries) { + await delay(retryDelayMs); + continue; + } + } + } + + return { + batchIndex: batch.batchIndex, + content: '', + success: false, + error: lastError, + }; +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export class ArpeggioRunner { + constructor( + private readonly deps: ArpeggioRunnerDeps, + ) {} + + /** + * Run an arpeggio movement: read data, expand templates, call LLM, + * merge results, and return an aggregated response. + */ + async runArpeggioMovement( + step: PieceMovement, + state: PieceState, + ): Promise<{ response: AgentResponse; instruction: string }> { + const arpeggioConfig = step.arpeggio; + if (!arpeggioConfig) { + throw new Error(`Movement "${step.name}" has no arpeggio configuration`); + } + + const movementIteration = incrementMovementIteration(state, step.name); + log.debug('Running arpeggio movement', { + movement: step.name, + source: arpeggioConfig.source, + batchSize: arpeggioConfig.batchSize, + concurrency: arpeggioConfig.concurrency, + movementIteration, + }); + + const dataSource = await createDataSource(arpeggioConfig.source, arpeggioConfig.sourcePath); + const batches = await dataSource.readBatches(arpeggioConfig.batchSize); + + if (batches.length === 0) { + throw new Error(`Data source returned no batches for movement "${step.name}"`); + } + + log.info('Arpeggio data loaded', { + movement: step.name, + batchCount: batches.length, + batchSize: arpeggioConfig.batchSize, + }); + + const template = loadTemplate(arpeggioConfig.templatePath); + + const agentOptions = this.deps.optionsBuilder.buildAgentOptions(step); + const semaphore = new Semaphore(arpeggioConfig.concurrency); + const results = await this.executeBatches( + batches, + template, + step, + agentOptions, + arpeggioConfig, + semaphore, + ); + + const failedBatches = results.filter((r) => !r.success); + if (failedBatches.length > 0) { + const errorDetails = failedBatches + .map((r) => `batch ${r.batchIndex}: ${r.error}`) + .join('; '); + throw new Error( + `Arpeggio movement "${step.name}" failed: ${failedBatches.length}/${results.length} batches failed (${errorDetails})` + ); + } + + const mergeFn = await buildMergeFn(arpeggioConfig.merge); + const mergedContent = mergeFn(results); + + if (arpeggioConfig.outputPath) { + writeMergedOutput(arpeggioConfig.outputPath, mergedContent); + log.info('Arpeggio output written', { outputPath: arpeggioConfig.outputPath }); + } + + const ruleCtx = { + state, + cwd: this.deps.getCwd(), + interactive: this.deps.getInteractive(), + detectRuleIndex: this.deps.detectRuleIndex, + callAiJudge: this.deps.callAiJudge, + }; + const match = await detectMatchedRule(step, mergedContent, '', ruleCtx); + + const aggregatedResponse: AgentResponse = { + persona: step.name, + status: 'done', + content: mergedContent, + timestamp: new Date(), + ...(match && { matchedRuleIndex: match.index, matchedRuleMethod: match.method }), + }; + + state.movementOutputs.set(step.name, aggregatedResponse); + state.lastOutput = aggregatedResponse; + + const instruction = `[Arpeggio] ${step.name}: ${batches.length} batches, source=${arpeggioConfig.source}`; + + return { response: aggregatedResponse, instruction }; + } + + /** Execute all batches with concurrency control */ + private async executeBatches( + batches: readonly DataBatch[], + template: string, + step: PieceMovement, + agentOptions: RunAgentOptions, + config: ArpeggioMovementConfig, + semaphore: Semaphore, + ): Promise { + const promises = batches.map(async (batch) => { + await semaphore.acquire(); + try { + this.deps.onPhaseStart?.(step, 1, 'execute', `[Arpeggio batch ${batch.batchIndex + 1}/${batch.totalBatches}]`); + const result = await executeBatchWithRetry( + batch, + template, + step.persona, + agentOptions, + config.maxRetries, + config.retryDelayMs, + ); + this.deps.onPhaseComplete?.( + step, 1, 'execute', + result.content, + result.success ? 'done' : 'error', + result.error, + ); + return result; + } finally { + semaphore.release(); + } + }); + + return Promise.all(promises); + } +} diff --git a/src/core/piece/engine/PieceEngine.ts b/src/core/piece/engine/PieceEngine.ts index d7f5071..ec86e8c 100644 --- a/src/core/piece/engine/PieceEngine.ts +++ b/src/core/piece/engine/PieceEngine.ts @@ -31,6 +31,7 @@ import { generateReportDir, getErrorMessage, createLogger } from '../../../share import { OptionsBuilder } from './OptionsBuilder.js'; import { MovementExecutor } from './MovementExecutor.js'; import { ParallelRunner } from './ParallelRunner.js'; +import { ArpeggioRunner } from './ArpeggioRunner.js'; const log = createLogger('engine'); @@ -60,6 +61,7 @@ export class PieceEngine extends EventEmitter { private readonly optionsBuilder: OptionsBuilder; private readonly movementExecutor: MovementExecutor; private readonly parallelRunner: ParallelRunner; + private readonly arpeggioRunner: ArpeggioRunner; private readonly detectRuleIndex: (content: string, movementName: string) => number; private readonly callAiJudge: ( agentOutput: string, @@ -139,6 +141,20 @@ export class PieceEngine extends EventEmitter { }, }); + this.arpeggioRunner = new ArpeggioRunner({ + optionsBuilder: this.optionsBuilder, + getCwd: () => this.cwd, + getInteractive: () => this.options.interactive === true, + detectRuleIndex: this.detectRuleIndex, + callAiJudge: this.callAiJudge, + onPhaseStart: (step, phase, phaseName, instruction) => { + this.emit('phase:start', step, phase, phaseName, instruction); + }, + onPhaseComplete: (step, phase, phaseName, content, phaseStatus, error) => { + this.emit('phase:complete', step, phase, phaseName, content, phaseStatus, error); + }, + }); + log.debug('PieceEngine initialized', { piece: config.name, movements: config.movements.map(s => s.name), @@ -290,7 +306,7 @@ export class PieceEngine extends EventEmitter { } } - /** Run a single movement (delegates to ParallelRunner if movement has parallel sub-movements) */ + /** Run a single movement (delegates to ParallelRunner, ArpeggioRunner, or MovementExecutor) */ private async runMovement(step: PieceMovement, prebuiltInstruction?: string): Promise<{ response: AgentResponse; instruction: string }> { const updateSession = this.updatePersonaSession.bind(this); let result: { response: AgentResponse; instruction: string }; @@ -299,6 +315,10 @@ export class PieceEngine extends EventEmitter { result = await this.parallelRunner.runParallelMovement( step, this.state, this.task, this.config.maxIterations, updateSession, ); + } else if (step.arpeggio) { + result = await this.arpeggioRunner.runArpeggioMovement( + step, this.state, + ); } else { result = await this.movementExecutor.runNormalMovement( step, this.state, this.task, this.config.maxIterations, updateSession, prebuiltInstruction, @@ -492,10 +512,11 @@ export class PieceEngine extends EventEmitter { this.state.iteration++; - // Build instruction before emitting movement:start so listeners can log it - const isParallel = movement.parallel && movement.parallel.length > 0; + // Build instruction before emitting movement:start so listeners can log it. + // Parallel and arpeggio movements handle iteration incrementing internally. + const isDelegated = (movement.parallel && movement.parallel.length > 0) || !!movement.arpeggio; let prebuiltInstruction: string | undefined; - if (!isParallel) { + if (!isDelegated) { const movementIteration = incrementMovementIteration(this.state, movement.name); prebuiltInstruction = this.movementExecutor.buildInstruction( movement, movementIteration, this.state, this.task, this.config.maxIterations, diff --git a/src/core/piece/engine/index.ts b/src/core/piece/engine/index.ts index f94c0b4..505be71 100644 --- a/src/core/piece/engine/index.ts +++ b/src/core/piece/engine/index.ts @@ -8,6 +8,7 @@ export { PieceEngine } from './PieceEngine.js'; export { MovementExecutor } from './MovementExecutor.js'; export type { MovementExecutorDeps } from './MovementExecutor.js'; export { ParallelRunner } from './ParallelRunner.js'; +export { ArpeggioRunner } from './ArpeggioRunner.js'; export { OptionsBuilder } from './OptionsBuilder.js'; export { CycleDetector } from './cycle-detector.js'; export type { CycleCheckResult } from './cycle-detector.js'; diff --git a/src/infra/config/loaders/pieceParser.ts b/src/infra/config/loaders/pieceParser.ts index 87d5039..800c870 100644 --- a/src/infra/config/loaders/pieceParser.ts +++ b/src/infra/config/loaders/pieceParser.ts @@ -6,11 +6,11 @@ */ import { readFileSync, existsSync } from 'node:fs'; -import { dirname } from 'node:path'; +import { dirname, resolve } from 'node:path'; import { parse as parseYaml } from 'yaml'; import type { z } from 'zod'; import { PieceConfigRawSchema, PieceMovementRawSchema } from '../../../core/models/index.js'; -import type { PieceConfig, PieceMovement, PieceRule, OutputContractEntry, OutputContractLabelPath, OutputContractItem, LoopMonitorConfig, LoopMonitorJudge } from '../../../core/models/index.js'; +import type { PieceConfig, PieceMovement, PieceRule, OutputContractEntry, OutputContractLabelPath, OutputContractItem, LoopMonitorConfig, LoopMonitorJudge, ArpeggioMovementConfig, ArpeggioMergeMovementConfig } from '../../../core/models/index.js'; import { getLanguage } from '../global/globalConfig.js'; import { type PieceSections, @@ -150,6 +150,35 @@ function normalizeRule(r: { }; } +/** Normalize raw arpeggio config from YAML into internal format. */ +function normalizeArpeggio( + raw: RawStep['arpeggio'], + pieceDir: string, +): ArpeggioMovementConfig | undefined { + if (!raw) return undefined; + + const merge: ArpeggioMergeMovementConfig = raw.merge + ? { + strategy: raw.merge.strategy, + inlineJs: raw.merge.inline_js, + filePath: raw.merge.file ? resolve(pieceDir, raw.merge.file) : undefined, + separator: raw.merge.separator, + } + : { strategy: 'concat' }; + + return { + source: raw.source, + sourcePath: resolve(pieceDir, raw.source_path), + batchSize: raw.batch_size, + concurrency: raw.concurrency, + templatePath: resolve(pieceDir, raw.template), + merge, + maxRetries: raw.max_retries, + retryDelayMs: raw.retry_delay_ms, + outputPath: raw.output_path ? resolve(pieceDir, raw.output_path) : undefined, + }; +} + /** Normalize a raw step into internal PieceMovement format. */ function normalizeStepFromRaw( step: RawStep, @@ -203,6 +232,11 @@ function normalizeStepFromRaw( result.parallel = step.parallel.map((sub: RawStep) => normalizeStepFromRaw(sub, pieceDir, sections, context)); } + const arpeggioConfig = normalizeArpeggio(step.arpeggio, pieceDir); + if (arpeggioConfig) { + result.arpeggio = arpeggioConfig; + } + return result; }