/** * @ruvector/edge-net REAL Workflow Orchestration * * Actually functional workflow system with: * - Real LLM agent execution for each step * - Real dependency resolution * - Real parallel/sequential execution * - Real result aggregation * * @module @ruvector/edge-net/real-workflows */ import { EventEmitter } from 'events'; import { randomBytes } from 'crypto'; import { RealAgentManager, LLMClient } from './real-agents.js'; import { RealWorkerPool } from './real-workers.js'; // ============================================ // WORKFLOW STEP TYPES // ============================================ export const StepTypes = { AGENT: 'agent', // LLM agent execution WORKER: 'worker', // Worker pool execution PARALLEL: 'parallel', // Parallel sub-steps SEQUENTIAL: 'sequential', // Sequential sub-steps CONDITION: 'condition', // Conditional branching TRANSFORM: 'transform', // Data transformation AGGREGATE: 'aggregate', // Result aggregation }; // ============================================ // WORKFLOW TEMPLATES // ============================================ export const WorkflowTemplates = { 'code-review': { name: 'Code Review', description: 'Comprehensive code review with multiple agents', steps: [ { id: 'analyze', type: 'agent', agentType: 'analyst', prompt: 'Analyze the code structure and identify key components: {{input}}', }, { id: 'review-quality', type: 'agent', agentType: 'reviewer', prompt: 'Review code quality, best practices, and potential issues based on analysis: {{analyze.output}}', dependsOn: ['analyze'], }, { id: 'review-security', type: 'agent', agentType: 'reviewer', prompt: 'Review security vulnerabilities and concerns: {{input}}', }, { id: 'suggestions', type: 'agent', agentType: 'coder', prompt: 'Provide specific code improvement suggestions based on reviews:\nQuality: {{review-quality.output}}\nSecurity: {{review-security.output}}', dependsOn: ['review-quality', 'review-security'], }, ], }, 'feature-dev': { name: 'Feature Development', description: 'End-to-end feature development workflow', steps: [ { id: 'research', type: 'agent', agentType: 'researcher', prompt: 'Research requirements and best practices for: {{input}}', }, { id: 'design', type: 'agent', agentType: 'analyst', prompt: 'Design the architecture and approach based on research: {{research.output}}', dependsOn: ['research'], }, { id: 'implement', type: 'agent', agentType: 'coder', prompt: 'Implement the feature based on design: {{design.output}}', dependsOn: ['design'], }, { id: 'test', type: 'agent', agentType: 'tester', prompt: 'Write tests for the implementation: {{implement.output}}', dependsOn: ['implement'], }, { id: 'review', type: 'agent', agentType: 'reviewer', prompt: 'Final review of implementation and tests:\nCode: {{implement.output}}\nTests: {{test.output}}', dependsOn: ['implement', 'test'], }, ], }, 'bug-fix': { name: 'Bug Fix', description: 'Systematic bug investigation and fix workflow', steps: [ { id: 'investigate', type: 'agent', agentType: 'analyst', prompt: 'Investigate the bug and identify root cause: {{input}}', }, { id: 'fix', type: 'agent', agentType: 'coder', prompt: 'Implement the fix for: {{investigate.output}}', dependsOn: ['investigate'], }, { id: 'test', type: 'agent', agentType: 'tester', prompt: 'Write regression tests to prevent recurrence: {{fix.output}}', dependsOn: ['fix'], }, { id: 'verify', type: 'agent', agentType: 'reviewer', prompt: 'Verify the fix is complete and correct:\nFix: {{fix.output}}\nTests: {{test.output}}', dependsOn: ['fix', 'test'], }, ], }, 'optimization': { name: 'Performance Optimization', description: 'Performance analysis and optimization workflow', steps: [ { id: 'profile', type: 'agent', agentType: 'optimizer', prompt: 'Profile and identify performance bottlenecks: {{input}}', }, { id: 'analyze', type: 'agent', agentType: 'analyst', prompt: 'Analyze profiling results and prioritize optimizations: {{profile.output}}', dependsOn: ['profile'], }, { id: 'optimize', type: 'agent', agentType: 'coder', prompt: 'Implement optimizations based on analysis: {{analyze.output}}', dependsOn: ['analyze'], }, { id: 'benchmark', type: 'agent', agentType: 'tester', prompt: 'Benchmark optimized code and compare: {{optimize.output}}', dependsOn: ['optimize'], }, ], }, 'research': { name: 'Research', description: 'Deep research and analysis workflow', steps: [ { id: 'gather', type: 'agent', agentType: 'researcher', prompt: 'Gather information and sources on: {{input}}', }, { id: 'analyze', type: 'agent', agentType: 'analyst', prompt: 'Analyze gathered information: {{gather.output}}', dependsOn: ['gather'], }, { id: 'synthesize', type: 'agent', agentType: 'researcher', prompt: 'Synthesize findings into actionable insights: {{analyze.output}}', dependsOn: ['analyze'], }, ], }, }; // ============================================ // WORKFLOW STEP // ============================================ class WorkflowStep { constructor(config) { this.id = config.id; this.type = config.type || StepTypes.AGENT; this.agentType = config.agentType; this.prompt = config.prompt; this.dependsOn = config.dependsOn || []; this.options = config.options || {}; this.subSteps = config.subSteps || []; this.condition = config.condition; this.transform = config.transform; this.status = 'pending'; this.output = null; this.error = null; this.startTime = null; this.endTime = null; } /** * Interpolate template variables */ interpolate(template, context) { return template.replace(/\{\{(\w+(?:\.\w+)?)\}\}/g, (match, path) => { const parts = path.split('.'); let value = context; for (const part of parts) { if (value && typeof value === 'object') { value = value[part]; } else { return match; // Keep original if not found } } if (typeof value === 'object') { return JSON.stringify(value, null, 2); } return value !== undefined ? String(value) : match; }); } getInfo() { return { id: this.id, type: this.type, status: this.status, duration: this.endTime && this.startTime ? this.endTime - this.startTime : null, dependsOn: this.dependsOn, hasOutput: this.output !== null, error: this.error, }; } } // ============================================ // REAL WORKFLOW ORCHESTRATOR // ============================================ /** * Real workflow orchestrator with actual LLM execution */ export class RealWorkflowOrchestrator extends EventEmitter { constructor(options = {}) { super(); this.agentManager = null; this.workerPool = null; this.workflows = new Map(); this.options = options; this.stats = { workflowsCompleted: 0, workflowsFailed: 0, stepsExecuted: 0, totalDuration: 0, }; } /** * Initialize orchestrator */ async initialize() { // Initialize agent manager for LLM execution this.agentManager = new RealAgentManager({ provider: this.options.provider || 'anthropic', apiKey: this.options.apiKey, }); await this.agentManager.initialize(); // Initialize worker pool for compute tasks this.workerPool = new RealWorkerPool({ size: 4 }); await this.workerPool.initialize(); return this; } /** * Create workflow from template or custom definition */ createWorkflow(nameOrConfig, customTask = null) { let config; if (typeof nameOrConfig === 'string') { const template = WorkflowTemplates[nameOrConfig]; if (!template) { throw new Error(`Unknown workflow template: ${nameOrConfig}`); } config = { ...template, input: customTask, }; } else { config = nameOrConfig; } const workflow = { id: `wf-${randomBytes(6).toString('hex')}`, name: config.name, description: config.description, input: config.input, steps: config.steps.map(s => new WorkflowStep(s)), status: 'created', results: {}, startTime: null, endTime: null, error: null, }; this.workflows.set(workflow.id, workflow); this.emit('workflow-created', { workflowId: workflow.id, name: workflow.name }); return workflow; } /** * Execute a workflow */ async executeWorkflow(workflowId) { const workflow = this.workflows.get(workflowId); if (!workflow) { throw new Error(`Workflow not found: ${workflowId}`); } workflow.status = 'running'; workflow.startTime = Date.now(); workflow.results = { input: workflow.input }; this.emit('workflow-start', { workflowId, name: workflow.name }); try { // Build dependency graph const graph = this.buildDependencyGraph(workflow.steps); // Execute steps respecting dependencies await this.executeSteps(workflow, graph); workflow.status = 'completed'; workflow.endTime = Date.now(); const duration = workflow.endTime - workflow.startTime; this.stats.workflowsCompleted++; this.stats.totalDuration += duration; this.emit('workflow-complete', { workflowId, duration, results: workflow.results, }); return { workflowId, status: 'completed', duration, results: workflow.results, steps: workflow.steps.map(s => s.getInfo()), }; } catch (error) { workflow.status = 'failed'; workflow.error = error.message; workflow.endTime = Date.now(); this.stats.workflowsFailed++; this.emit('workflow-error', { workflowId, error: error.message }); throw error; } } /** * Build dependency graph */ buildDependencyGraph(steps) { const graph = new Map(); const stepMap = new Map(); for (const step of steps) { stepMap.set(step.id, step); graph.set(step.id, new Set(step.dependsOn)); } return { graph, stepMap }; } /** * Execute steps respecting dependencies */ async executeSteps(workflow, { graph, stepMap }) { const completed = new Set(); const running = new Map(); const isReady = (stepId) => { const deps = graph.get(stepId); return [...deps].every(d => completed.has(d)); }; const getReadySteps = () => { const ready = []; for (const [stepId, deps] of graph) { if (!completed.has(stepId) && !running.has(stepId) && isReady(stepId)) { ready.push(stepMap.get(stepId)); } } return ready; }; while (completed.size < stepMap.size) { const readySteps = getReadySteps(); if (readySteps.length === 0 && running.size === 0) { throw new Error('Workflow deadlock: no steps ready and none running'); } // Execute ready steps in parallel for (const step of readySteps) { const promise = this.executeStep(step, workflow.results) .then(result => { workflow.results[step.id] = { output: result }; completed.add(step.id); running.delete(step.id); this.stats.stepsExecuted++; }) .catch(error => { step.error = error.message; step.status = 'failed'; throw error; }); running.set(step.id, promise); } // Wait for at least one to complete if (running.size > 0) { await Promise.race(running.values()); } } } /** * Execute a single step */ async executeStep(step, context) { step.status = 'running'; step.startTime = Date.now(); this.emit('step-start', { stepId: step.id, type: step.type }); try { let result; switch (step.type) { case StepTypes.AGENT: result = await this.executeAgentStep(step, context); break; case StepTypes.WORKER: result = await this.executeWorkerStep(step, context); break; case StepTypes.PARALLEL: result = await this.executeParallelStep(step, context); break; case StepTypes.SEQUENTIAL: result = await this.executeSequentialStep(step, context); break; case StepTypes.TRANSFORM: result = await this.executeTransformStep(step, context); break; case StepTypes.CONDITION: result = await this.executeConditionStep(step, context); break; case StepTypes.AGGREGATE: result = await this.executeAggregateStep(step, context); break; default: throw new Error(`Unknown step type: ${step.type}`); } step.output = result; step.status = 'completed'; step.endTime = Date.now(); this.emit('step-complete', { stepId: step.id, duration: step.endTime - step.startTime, }); return result; } catch (error) { step.status = 'failed'; step.error = error.message; step.endTime = Date.now(); this.emit('step-error', { stepId: step.id, error: error.message }); throw error; } } /** * Execute agent step with real LLM */ async executeAgentStep(step, context) { const prompt = step.interpolate(step.prompt, context); const result = await this.agentManager.quickExecute( step.agentType || 'coder', prompt, { model: step.options.model || 'balanced', ...step.options, } ); return result.content; } /** * Execute worker step */ async executeWorkerStep(step, context) { const data = step.interpolate( JSON.stringify(step.options.data || context.input), context ); return this.workerPool.execute( step.options.taskType || 'process', JSON.parse(data), step.options ); } /** * Execute parallel sub-steps */ async executeParallelStep(step, context) { const subSteps = step.subSteps.map(s => new WorkflowStep(s)); const promises = subSteps.map(s => this.executeStep(s, context)); const results = await Promise.all(promises); return results.reduce((acc, result, i) => { acc[subSteps[i].id] = result; return acc; }, {}); } /** * Execute sequential sub-steps */ async executeSequentialStep(step, context) { const subSteps = step.subSteps.map(s => new WorkflowStep(s)); const results = {}; for (const subStep of subSteps) { results[subStep.id] = await this.executeStep(subStep, { ...context, ...results }); } return results; } /** * Execute transform step */ async executeTransformStep(step, context) { const inputKey = step.options.input || 'input'; const input = context[inputKey]?.output || context[inputKey] || context.input; if (step.transform) { // Custom transform function as string const fn = new Function('input', 'context', step.transform); return fn(input, context); } // Default transforms const transformType = step.options.transformType || 'identity'; switch (transformType) { case 'json': return JSON.parse(input); case 'stringify': return JSON.stringify(input); case 'extract': return input[step.options.key]; default: return input; } } /** * Execute condition step */ async executeConditionStep(step, context) { const condition = step.interpolate(step.condition, context); // Evaluate condition const fn = new Function('context', `return ${condition}`); const result = fn(context); if (result && step.options.then) { const thenStep = new WorkflowStep(step.options.then); return this.executeStep(thenStep, context); } else if (!result && step.options.else) { const elseStep = new WorkflowStep(step.options.else); return this.executeStep(elseStep, context); } return result; } /** * Execute aggregate step */ async executeAggregateStep(step, context) { const keys = step.options.keys || Object.keys(context).filter(k => k !== 'input'); const aggregated = {}; for (const key of keys) { if (context[key]) { aggregated[key] = context[key].output || context[key]; } } if (step.options.format === 'summary') { return Object.entries(aggregated) .map(([k, v]) => `## ${k}\n${typeof v === 'string' ? v : JSON.stringify(v, null, 2)}`) .join('\n\n'); } return aggregated; } /** * Run workflow by template name */ async run(templateName, input, options = {}) { const workflow = this.createWorkflow(templateName, input); return this.executeWorkflow(workflow.id); } /** * Run custom workflow */ async runCustom(config) { const workflow = this.createWorkflow(config); return this.executeWorkflow(workflow.id); } /** * Get workflow status */ getWorkflow(workflowId) { const workflow = this.workflows.get(workflowId); if (!workflow) return null; return { id: workflow.id, name: workflow.name, status: workflow.status, steps: workflow.steps.map(s => s.getInfo()), duration: workflow.endTime && workflow.startTime ? workflow.endTime - workflow.startTime : null, error: workflow.error, }; } /** * Get orchestrator stats */ getStats() { return { ...this.stats, activeWorkflows: [...this.workflows.values()] .filter(w => w.status === 'running').length, agentManager: this.agentManager?.listAgents()?.length || 0, workerPool: this.workerPool?.getStatus(), }; } /** * Shutdown orchestrator */ async shutdown() { if (this.agentManager) { await this.agentManager.close(); } if (this.workerPool) { await this.workerPool.shutdown(); } } // Alias for shutdown async close() { return this.shutdown(); } } // Export WorkflowStep (not exported with export class) export { WorkflowStep }; // Default export export default RealWorkflowOrchestrator;