From 1acd991e7e01628a821168d1a45389daab7df1e3 Mon Sep 17 00:00:00 2001 From: nrs <38722970+nrslib@users.noreply.github.com> Date: Sun, 22 Feb 2026 21:06:29 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20pipeline=20=E3=83=A2=E3=83=BC=E3=83=89?= =?UTF-8?q?=E3=81=A7=E3=81=AE=20Slack=20=E9=80=9A=E7=9F=A5=E3=82=92?= =?UTF-8?q?=E5=BC=B7=E5=8C=96=20(#346)=20(#347)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: pipeline モードでの Slack 通知を try/finally パターンで実装 - executePipeline の本体を try/finally で囲み、全終了パスで通知を送信 - PipelineResult でスプレッド演算子による不変状態追跡 - notifySlack ヘルパーで webhook 未設定時は即 return - 既存の早期リターンパターンを保持したまま通知機能を追加 * refactor: executePipeline のオーケストレーションと各ステップを分離 - execute.ts: オーケストレーション + Slack 通知 (157行) - steps.ts: 5つのステップ関数 + テンプレートヘルパー (233行) - runPipeline で全ステップを同じ抽象レベルに揃えた - buildResult ヘルパーで let 再代入を最小化 * test: commitAndPush の git 操作失敗時の exit code 4 テストを追加 --- src/__tests__/pipelineExecution.test.ts | 132 +++++++++- src/features/pipeline/execute.ts | 328 ++++++++---------------- src/features/pipeline/steps.ts | 233 +++++++++++++++++ 3 files changed, 472 insertions(+), 221 deletions(-) create mode 100644 src/features/pipeline/steps.ts diff --git a/src/__tests__/pipelineExecution.test.ts b/src/__tests__/pipelineExecution.test.ts index 5a4d918..8f90869 100644 --- a/src/__tests__/pipelineExecution.test.ts +++ b/src/__tests__/pipelineExecution.test.ts @@ -57,7 +57,10 @@ vi.mock('../shared/ui/index.js', () => ({ warn: vi.fn(), debug: vi.fn(), })); -// Mock debug logger +// Mock Slack + utils +const mockGetSlackWebhookUrl = vi.fn<() => string | undefined>(() => undefined); +const mockSendSlackNotification = vi.fn<(url: string, message: string) => Promise>(); +const mockBuildSlackRunSummary = vi.fn<(params: unknown) => string>(() => 'TAKT Run Summary'); vi.mock('../shared/utils/index.js', async (importOriginal) => ({ ...(await importOriginal>()), createLogger: () => ({ @@ -65,6 +68,14 @@ vi.mock('../shared/utils/index.js', async (importOriginal) => ({ debug: vi.fn(), error: vi.fn(), }), + getSlackWebhookUrl: (...args: unknown[]) => mockGetSlackWebhookUrl(...args as []), + sendSlackNotification: (...args: unknown[]) => mockSendSlackNotification(...(args as [string, string])), + buildSlackRunSummary: (...args: unknown[]) => mockBuildSlackRunSummary(...(args as [unknown])), +})); + +// Mock generateRunId +vi.mock('../features/tasks/execute/slackSummaryAdapter.js', () => ({ + generateRunId: () => 'run-20260222-000000', })); const { executePipeline } = await import('../features/pipeline/index.js'); @@ -72,6 +83,8 @@ const { executePipeline } = await import('../features/pipeline/index.js'); describe('executePipeline', () => { beforeEach(() => { vi.clearAllMocks(); + // Default: no Slack webhook + mockGetSlackWebhookUrl.mockReturnValue(undefined); // Default: git operations succeed mockExecFileSync.mockReturnValue('abc1234\n'); // Default: no pipeline config @@ -675,4 +688,121 @@ describe('executePipeline', () => { ); }); }); + + it('should return exit code 4 when git commit/push fails', async () => { + mockExecuteTask.mockResolvedValueOnce(true); + // stageAndCommit calls execFileSync('git', ['add', ...]) then ('git', ['commit', ...]) + // Make the commit call throw + mockExecFileSync.mockImplementation((_cmd: string, args: string[]) => { + if (args[0] === 'commit') { + throw new Error('nothing to commit'); + } + return 'abc1234\n'; + }); + + const exitCode = await executePipeline({ + task: 'Fix the bug', + piece: 'default', + branch: 'fix/my-branch', + autoPr: false, + cwd: '/tmp/test', + }); + + expect(exitCode).toBe(4); + }); + + describe('Slack notification', () => { + it('should not send Slack notification when webhook is not configured', async () => { + mockGetSlackWebhookUrl.mockReturnValue(undefined); + mockExecuteTask.mockResolvedValueOnce(true); + + await executePipeline({ + task: 'Fix the bug', + piece: 'default', + autoPr: false, + skipGit: true, + cwd: '/tmp/test', + }); + + expect(mockSendSlackNotification).not.toHaveBeenCalled(); + }); + + it('should send success notification when webhook is configured', async () => { + mockGetSlackWebhookUrl.mockReturnValue('https://hooks.slack.com/test'); + mockExecuteTask.mockResolvedValueOnce(true); + + await executePipeline({ + task: 'Fix the bug', + piece: 'default', + autoPr: false, + skipGit: true, + cwd: '/tmp/test', + }); + + expect(mockBuildSlackRunSummary).toHaveBeenCalledWith( + expect.objectContaining({ + runId: 'run-20260222-000000', + total: 1, + success: 1, + failed: 0, + concurrency: 1, + tasks: [expect.objectContaining({ + name: 'pipeline', + success: true, + piece: 'default', + })], + }), + ); + expect(mockSendSlackNotification).toHaveBeenCalledWith( + 'https://hooks.slack.com/test', + 'TAKT Run Summary', + ); + }); + + it('should send failure notification when piece fails', async () => { + mockGetSlackWebhookUrl.mockReturnValue('https://hooks.slack.com/test'); + mockExecuteTask.mockResolvedValueOnce(false); + + await executePipeline({ + task: 'Fix the bug', + piece: 'default', + autoPr: false, + skipGit: true, + cwd: '/tmp/test', + }); + + expect(mockBuildSlackRunSummary).toHaveBeenCalledWith( + expect.objectContaining({ + success: 0, + failed: 1, + tasks: [expect.objectContaining({ + success: false, + })], + }), + ); + expect(mockSendSlackNotification).toHaveBeenCalled(); + }); + + it('should include PR URL in notification when auto-pr succeeds', async () => { + mockGetSlackWebhookUrl.mockReturnValue('https://hooks.slack.com/test'); + mockExecuteTask.mockResolvedValueOnce(true); + mockCreatePullRequest.mockReturnValueOnce({ success: true, url: 'https://github.com/test/pr/99' }); + + await executePipeline({ + task: 'Fix the bug', + piece: 'default', + branch: 'fix/test', + autoPr: true, + cwd: '/tmp/test', + }); + + expect(mockBuildSlackRunSummary).toHaveBeenCalledWith( + expect.objectContaining({ + tasks: [expect.objectContaining({ + prUrl: 'https://github.com/test/pr/99', + })], + }), + ); + }); + }); }); diff --git a/src/features/pipeline/execute.ts b/src/features/pipeline/execute.ts index 19b68ad..70182a8 100644 --- a/src/features/pipeline/execute.ts +++ b/src/features/pipeline/execute.ts @@ -1,269 +1,157 @@ /** - * Pipeline execution flow + * Pipeline orchestration * - * Orchestrates the full pipeline: - * 1. Fetch issue content - * 2. Create branch + * Thin orchestrator that coordinates pipeline steps: + * 1. Resolve task content + * 2. Prepare execution environment * 3. Run piece * 4. Commit & push * 5. Create PR + * + * Each step is implemented in steps.ts. */ -import { execFileSync } from 'node:child_process'; -import { - fetchIssue, - formatIssueAsTask, - checkGhCli, - createPullRequest, - pushBranch, - buildPrBody, - type GitHubIssue, -} from '../../infra/github/index.js'; -import { stageAndCommit, resolveBaseBranch } from '../../infra/task/index.js'; -import { executeTask, confirmAndCreateWorktree, type TaskExecutionOptions, type PipelineExecutionOptions } from '../tasks/index.js'; import { resolveConfigValues } from '../../infra/config/index.js'; -import { info, error, success, status, blankLine } from '../../shared/ui/index.js'; -import { createLogger, getErrorMessage } from '../../shared/utils/index.js'; -import type { PipelineConfig } from '../../core/models/index.js'; +import { info, error, status, blankLine } from '../../shared/ui/index.js'; +import { createLogger, getErrorMessage, getSlackWebhookUrl, sendSlackNotification, buildSlackRunSummary } from '../../shared/utils/index.js'; +import type { SlackTaskDetail } from '../../shared/utils/index.js'; +import { generateRunId } from '../tasks/execute/slackSummaryAdapter.js'; +import type { PipelineExecutionOptions } from '../tasks/index.js'; import { EXIT_ISSUE_FETCH_FAILED, EXIT_PIECE_FAILED, EXIT_GIT_OPERATION_FAILED, EXIT_PR_CREATION_FAILED, } from '../../shared/exitCodes.js'; +import { + resolveTaskContent, + resolveExecutionContext, + runPiece, + commitAndPush, + submitPullRequest, + buildCommitMessage, + type ExecutionContext, +} from './steps.js'; export type { PipelineExecutionOptions }; const log = createLogger('pipeline'); -/** - * Expand template variables in a string. - * Supported: {title}, {issue}, {issue_body}, {report} - */ -function expandTemplate(template: string, vars: Record): string { - return template.replace(/\{(\w+)\}/g, (match, key: string) => vars[key] ?? match); +// ---- Pipeline orchestration ---- + +interface PipelineOutcome { + exitCode: number; + result: PipelineResult; } -/** Generate a branch name for pipeline execution */ -function generatePipelineBranchName(pipelineConfig: PipelineConfig | undefined, issueNumber?: number): string { - const prefix = pipelineConfig?.defaultBranchPrefix ?? 'takt/'; - const timestamp = Math.floor(Date.now() / 1000); - if (issueNumber) { - return `${prefix}issue-${issueNumber}-${timestamp}`; - } - return `${prefix}pipeline-${timestamp}`; -} +async function runPipeline(options: PipelineExecutionOptions): Promise { + const { cwd, piece, autoPr, skipGit } = options; + const pipelineConfig = resolveConfigValues(cwd, ['pipeline']).pipeline; -/** Create and checkout a new branch */ -function createBranch(cwd: string, branch: string): void { - execFileSync('git', ['checkout', '-b', branch], { - cwd, - stdio: 'pipe', + const buildResult = (overrides: Partial = {}): PipelineResult => ({ + success: false, piece, issueNumber: options.issueNumber, ...overrides, }); -} -/** Build commit message from template or defaults */ -function buildCommitMessage( - pipelineConfig: PipelineConfig | undefined, - issue: GitHubIssue | undefined, - taskText: string | undefined, -): string { - const template = pipelineConfig?.commitMessageTemplate; - if (template && issue) { - return expandTemplate(template, { - title: issue.title, - issue: String(issue.number), - }); + // Step 1: Resolve task content + const taskContent = resolveTaskContent(options); + if (!taskContent) return { exitCode: EXIT_ISSUE_FETCH_FAILED, result: buildResult() }; + + // Step 2: Prepare execution environment + let context: ExecutionContext; + try { + context = await resolveExecutionContext(cwd, taskContent.task, options, pipelineConfig); + } catch (err) { + error(`Failed to prepare execution environment: ${getErrorMessage(err)}`); + return { exitCode: EXIT_GIT_OPERATION_FAILED, result: buildResult() }; } - // Default commit message - return issue - ? `feat: ${issue.title} (#${issue.number})` - : `takt: ${taskText ?? 'pipeline task'}`; -} -/** Build PR body from template or defaults */ -function buildPipelinePrBody( - pipelineConfig: PipelineConfig | undefined, - issue: GitHubIssue | undefined, - report: string, -): string { - const template = pipelineConfig?.prBodyTemplate; - if (template && issue) { - return expandTemplate(template, { - title: issue.title, - issue: String(issue.number), - issue_body: issue.body || issue.title, - report, - }); - } - return buildPrBody(issue ? [issue] : undefined, report); -} + // Step 3: Run piece + log.info('Pipeline piece execution starting', { piece, branch: context.branch, skipGit, issueNumber: options.issueNumber }); + const pieceOk = await runPiece(cwd, piece, taskContent.task, context.execCwd, options); + if (!pieceOk) return { exitCode: EXIT_PIECE_FAILED, result: buildResult({ branch: context.branch }) }; -interface ExecutionContext { - execCwd: string; - branch?: string; - baseBranch?: string; - isWorktree: boolean; -} - -/** - * Resolve the execution environment for the pipeline. - * Creates a worktree, a branch, or uses the current directory as-is. - */ -async function resolveExecutionContext( - cwd: string, - task: string, - options: Pick, - pipelineConfig: PipelineConfig | undefined, -): Promise { - if (options.createWorktree) { - const result = await confirmAndCreateWorktree(cwd, task, options.createWorktree); - if (result.isWorktree) { - success(`Worktree created: ${result.execCwd}`); + // Step 4: Commit & push + if (!skipGit && context.branch) { + const commitMessage = buildCommitMessage(pipelineConfig, taskContent.issue, options.task); + if (!commitAndPush(context.execCwd, cwd, context.branch, commitMessage, context.isWorktree)) { + return { exitCode: EXIT_GIT_OPERATION_FAILED, result: buildResult({ branch: context.branch }) }; } - return { execCwd: result.execCwd, branch: result.branch, baseBranch: result.baseBranch, isWorktree: result.isWorktree }; } - if (options.skipGit) { - return { execCwd: cwd, isWorktree: false }; + // Step 5: Create PR + let prUrl: string | undefined; + if (autoPr && !skipGit && context.branch) { + prUrl = submitPullRequest(cwd, context.branch, context.baseBranch, taskContent, piece, pipelineConfig, options); + if (!prUrl) return { exitCode: EXIT_PR_CREATION_FAILED, result: buildResult({ branch: context.branch }) }; + } else if (autoPr && skipGit) { + info('--auto-pr is ignored when --skip-git is specified (no push was performed)'); } - const resolved = resolveBaseBranch(cwd); - const branch = options.branch ?? generatePipelineBranchName(pipelineConfig, options.issueNumber); - info(`Creating branch: ${branch}`); - createBranch(cwd, branch); - success(`Branch created: ${branch}`); - return { execCwd: cwd, branch, baseBranch: resolved.branch, isWorktree: false }; + // Summary + blankLine(); + status('Issue', taskContent.issue ? `#${taskContent.issue.number} "${taskContent.issue.title}"` : 'N/A'); + status('Branch', context.branch ?? '(current)'); + status('Piece', piece); + status('Result', 'Success', 'green'); + + return { exitCode: 0, result: buildResult({ success: true, branch: context.branch, prUrl }) }; } +// ---- Public API ---- + /** * Execute the full pipeline. * * Returns a process exit code (0 on success, 2-5 on specific failures). */ export async function executePipeline(options: PipelineExecutionOptions): Promise { - const { cwd, piece, autoPr, draftPr, skipGit } = options; - const globalConfig = resolveConfigValues(cwd, ['pipeline']); - const pipelineConfig = globalConfig.pipeline; - let issue: GitHubIssue | undefined; - let task: string; + const startTime = Date.now(); + const runId = generateRunId(); + let pipelineResult: PipelineResult = { success: false, piece: options.piece, issueNumber: options.issueNumber }; - // --- Step 1: Resolve task content --- - if (options.issueNumber) { - info(`Fetching issue #${options.issueNumber}...`); - try { - const ghStatus = checkGhCli(); - if (!ghStatus.available) { - error(ghStatus.error ?? 'gh CLI is not available'); - return EXIT_ISSUE_FETCH_FAILED; - } - issue = fetchIssue(options.issueNumber); - task = formatIssueAsTask(issue); - success(`Issue #${options.issueNumber} fetched: "${issue.title}"`); - } catch (err) { - error(`Failed to fetch issue #${options.issueNumber}: ${getErrorMessage(err)}`); - return EXIT_ISSUE_FETCH_FAILED; - } - } else if (options.task) { - task = options.task; - } else { - error('Either --issue or --task must be specified'); - return EXIT_ISSUE_FETCH_FAILED; - } - - // --- Step 2: Prepare execution environment --- - let context: ExecutionContext; try { - context = await resolveExecutionContext(cwd, task, options, pipelineConfig); - } catch (err) { - error(`Failed to prepare execution environment: ${getErrorMessage(err)}`); - return EXIT_GIT_OPERATION_FAILED; + const outcome = await runPipeline(options); + pipelineResult = outcome.result; + return outcome.exitCode; + } finally { + await notifySlack(runId, startTime, pipelineResult); } - const { execCwd, branch, baseBranch, isWorktree } = context; +} - // --- Step 3: Run piece --- - info(`Running piece: ${piece}`); - log.info('Pipeline piece execution starting', { piece, branch, skipGit, issueNumber: options.issueNumber }); +// ---- Slack notification ---- - const agentOverrides: TaskExecutionOptions | undefined = (options.provider || options.model) - ? { provider: options.provider, model: options.model } - : undefined; +interface PipelineResult { + success: boolean; + piece: string; + issueNumber?: number; + branch?: string; + prUrl?: string; +} - const taskSuccess = await executeTask({ - task, - cwd: execCwd, - pieceIdentifier: piece, - projectCwd: cwd, - agentOverrides, +/** Send Slack notification if webhook is configured. Never throws. */ +async function notifySlack(runId: string, startTime: number, result: PipelineResult): Promise { + const webhookUrl = getSlackWebhookUrl(); + if (!webhookUrl) return; + + const durationSec = Math.round((Date.now() - startTime) / 1000); + const task: SlackTaskDetail = { + name: 'pipeline', + success: result.success, + piece: result.piece, + issueNumber: result.issueNumber, + durationSec, + branch: result.branch, + prUrl: result.prUrl, + }; + const message = buildSlackRunSummary({ + runId, + total: 1, + success: result.success ? 1 : 0, + failed: result.success ? 0 : 1, + durationSec, + concurrency: 1, + tasks: [task], }); - if (!taskSuccess) { - error(`Piece '${piece}' failed`); - return EXIT_PIECE_FAILED; - } - success(`Piece '${piece}' completed`); - - // --- Step 4: Commit & push (skip if --skip-git) --- - if (!skipGit && branch) { - const commitMessage = buildCommitMessage(pipelineConfig, issue, options.task); - - info('Committing changes...'); - try { - const commitHash = stageAndCommit(execCwd, commitMessage); - if (commitHash) { - success(`Changes committed: ${commitHash}`); - } else { - info('No changes to commit'); - } - - if (isWorktree) { - // Clone has no origin — push to main project via path, then project pushes to origin - execFileSync('git', ['push', cwd, 'HEAD'], { cwd: execCwd, stdio: 'pipe' }); - } - - info(`Pushing to origin/${branch}...`); - pushBranch(cwd, branch); - success(`Pushed to origin/${branch}`); - } catch (err) { - error(`Git operation failed: ${getErrorMessage(err)}`); - return EXIT_GIT_OPERATION_FAILED; - } - } - - // --- Step 5: Create PR (if --auto-pr) --- - if (autoPr) { - if (skipGit) { - info('--auto-pr is ignored when --skip-git is specified (no push was performed)'); - } else if (branch) { - info('Creating pull request...'); - const prTitle = issue ? issue.title : (options.task ?? 'Pipeline task'); - const report = `Piece \`${piece}\` completed successfully.`; - const prBody = buildPipelinePrBody(pipelineConfig, issue, report); - - const prResult = createPullRequest(cwd, { - branch, - title: prTitle, - body: prBody, - base: baseBranch, - repo: options.repo, - draft: draftPr, - }); - - if (prResult.success) { - success(`PR created: ${prResult.url}`); - } else { - error(`PR creation failed: ${prResult.error}`); - return EXIT_PR_CREATION_FAILED; - } - } - } - - // --- Summary --- - blankLine(); - status('Issue', issue ? `#${issue.number} "${issue.title}"` : 'N/A'); - status('Branch', branch ?? '(current)'); - status('Piece', piece); - status('Result', 'Success', 'green'); - - return 0; + await sendSlackNotification(webhookUrl, message); } diff --git a/src/features/pipeline/steps.ts b/src/features/pipeline/steps.ts new file mode 100644 index 0000000..48c23c0 --- /dev/null +++ b/src/features/pipeline/steps.ts @@ -0,0 +1,233 @@ +/** + * Pipeline step implementations + * + * Each function encapsulates one step of the pipeline, + * keeping the orchestrator at a consistent abstraction level. + */ + +import { execFileSync } from 'node:child_process'; +import { + fetchIssue, + formatIssueAsTask, + checkGhCli, + createPullRequest, + pushBranch, + buildPrBody, + type GitHubIssue, +} from '../../infra/github/index.js'; +import { stageAndCommit, resolveBaseBranch } from '../../infra/task/index.js'; +import { executeTask, confirmAndCreateWorktree, type TaskExecutionOptions, type PipelineExecutionOptions } from '../tasks/index.js'; +import { info, error, success } from '../../shared/ui/index.js'; +import { getErrorMessage } from '../../shared/utils/index.js'; +import type { PipelineConfig } from '../../core/models/index.js'; + +// ---- Types ---- + +export interface TaskContent { + task: string; + issue?: GitHubIssue; +} + +export interface ExecutionContext { + execCwd: string; + branch?: string; + baseBranch?: string; + isWorktree: boolean; +} + +// ---- Template helpers ---- + +function expandTemplate(template: string, vars: Record): string { + return template.replace(/\{(\w+)\}/g, (match, key: string) => vars[key] ?? match); +} + +function generatePipelineBranchName(pipelineConfig: PipelineConfig | undefined, issueNumber?: number): string { + const prefix = pipelineConfig?.defaultBranchPrefix ?? 'takt/'; + const timestamp = Math.floor(Date.now() / 1000); + return issueNumber + ? `${prefix}issue-${issueNumber}-${timestamp}` + : `${prefix}pipeline-${timestamp}`; +} + +export function buildCommitMessage( + pipelineConfig: PipelineConfig | undefined, + issue: GitHubIssue | undefined, + taskText: string | undefined, +): string { + const template = pipelineConfig?.commitMessageTemplate; + if (template && issue) { + return expandTemplate(template, { + title: issue.title, + issue: String(issue.number), + }); + } + return issue + ? `feat: ${issue.title} (#${issue.number})` + : `takt: ${taskText ?? 'pipeline task'}`; +} + +function buildPipelinePrBody( + pipelineConfig: PipelineConfig | undefined, + issue: GitHubIssue | undefined, + report: string, +): string { + const template = pipelineConfig?.prBodyTemplate; + if (template && issue) { + return expandTemplate(template, { + title: issue.title, + issue: String(issue.number), + issue_body: issue.body || issue.title, + report, + }); + } + return buildPrBody(issue ? [issue] : undefined, report); +} + +// ---- Step 1: Resolve task content ---- + +export function resolveTaskContent(options: PipelineExecutionOptions): TaskContent | undefined { + if (options.issueNumber) { + info(`Fetching issue #${options.issueNumber}...`); + const ghStatus = checkGhCli(); + if (!ghStatus.available) { + error(ghStatus.error ?? 'gh CLI is not available'); + return undefined; + } + try { + const issue = fetchIssue(options.issueNumber); + const task = formatIssueAsTask(issue); + success(`Issue #${options.issueNumber} fetched: "${issue.title}"`); + return { task, issue }; + } catch (err) { + error(`Failed to fetch issue #${options.issueNumber}: ${getErrorMessage(err)}`); + return undefined; + } + } + if (options.task) { + return { task: options.task }; + } + error('Either --issue or --task must be specified'); + return undefined; +} + +// ---- Step 2: Resolve execution context ---- + +export async function resolveExecutionContext( + cwd: string, + task: string, + options: Pick, + pipelineConfig: PipelineConfig | undefined, +): Promise { + if (options.createWorktree) { + const result = await confirmAndCreateWorktree(cwd, task, options.createWorktree); + if (result.isWorktree) { + success(`Worktree created: ${result.execCwd}`); + } + return { execCwd: result.execCwd, branch: result.branch, baseBranch: result.baseBranch, isWorktree: result.isWorktree }; + } + if (options.skipGit) { + return { execCwd: cwd, isWorktree: false }; + } + const resolved = resolveBaseBranch(cwd); + const branch = options.branch ?? generatePipelineBranchName(pipelineConfig, options.issueNumber); + info(`Creating branch: ${branch}`); + execFileSync('git', ['checkout', '-b', branch], { cwd, stdio: 'pipe' }); + success(`Branch created: ${branch}`); + return { execCwd: cwd, branch, baseBranch: resolved.branch, isWorktree: false }; +} + +// ---- Step 3: Run piece ---- + +export async function runPiece( + projectCwd: string, + piece: string, + task: string, + execCwd: string, + options: Pick, +): Promise { + info(`Running piece: ${piece}`); + const agentOverrides: TaskExecutionOptions | undefined = (options.provider || options.model) + ? { provider: options.provider, model: options.model } + : undefined; + + const taskSuccess = await executeTask({ + task, + cwd: execCwd, + pieceIdentifier: piece, + projectCwd, + agentOverrides, + }); + + if (!taskSuccess) { + error(`Piece '${piece}' failed`); + return false; + } + success(`Piece '${piece}' completed`); + return true; +} + +// ---- Step 4: Commit & push ---- + +export function commitAndPush( + execCwd: string, + projectCwd: string, + branch: string, + commitMessage: string, + isWorktree: boolean, +): boolean { + info('Committing changes...'); + try { + const commitHash = stageAndCommit(execCwd, commitMessage); + if (commitHash) { + success(`Changes committed: ${commitHash}`); + } else { + info('No changes to commit'); + } + + if (isWorktree) { + // Clone has no origin — push to main project via path, then project pushes to origin + execFileSync('git', ['push', projectCwd, 'HEAD'], { cwd: execCwd, stdio: 'pipe' }); + } + + info(`Pushing to origin/${branch}...`); + pushBranch(projectCwd, branch); + success(`Pushed to origin/${branch}`); + return true; + } catch (err) { + error(`Git operation failed: ${getErrorMessage(err)}`); + return false; + } +} + +// ---- Step 5: Submit pull request ---- + +export function submitPullRequest( + projectCwd: string, + branch: string, + baseBranch: string | undefined, + taskContent: TaskContent, + piece: string, + pipelineConfig: PipelineConfig | undefined, + options: Pick, +): string | undefined { + info('Creating pull request...'); + const prTitle = taskContent.issue ? taskContent.issue.title : (options.task ?? 'Pipeline task'); + const report = `Piece \`${piece}\` completed successfully.`; + const prBody = buildPipelinePrBody(pipelineConfig, taskContent.issue, report); + + const prResult = createPullRequest(projectCwd, { + branch, + title: prTitle, + body: prBody, + base: baseBranch, + repo: options.repo, + draft: options.draftPr, + }); + + if (prResult.success) { + success(`PR created: ${prResult.url}`); + return prResult.url; + } + error(`PR creation failed: ${prResult.error}`); + return undefined; +}