740 lines
22 KiB
JavaScript
740 lines
22 KiB
JavaScript
/**
|
|
* @ruvector/edge-net REAL Workflow Orchestration
|
|
*
|
|
* Actually functional workflow system with:
|
|
* - Real LLM agent execution for each step
|
|
* - Real dependency resolution
|
|
* - Real parallel/sequential execution
|
|
* - Real result aggregation
|
|
*
|
|
* @module @ruvector/edge-net/real-workflows
|
|
*/
|
|
|
|
import { EventEmitter } from 'events';
|
|
import { randomBytes } from 'crypto';
|
|
import { RealAgentManager, LLMClient } from './real-agents.js';
|
|
import { RealWorkerPool } from './real-workers.js';
|
|
|
|
// ============================================
|
|
// WORKFLOW STEP TYPES
|
|
// ============================================
|
|
|
|
export const StepTypes = {
|
|
AGENT: 'agent', // LLM agent execution
|
|
WORKER: 'worker', // Worker pool execution
|
|
PARALLEL: 'parallel', // Parallel sub-steps
|
|
SEQUENTIAL: 'sequential', // Sequential sub-steps
|
|
CONDITION: 'condition', // Conditional branching
|
|
TRANSFORM: 'transform', // Data transformation
|
|
AGGREGATE: 'aggregate', // Result aggregation
|
|
};
|
|
|
|
// ============================================
|
|
// WORKFLOW TEMPLATES
|
|
// ============================================
|
|
|
|
export const WorkflowTemplates = {
|
|
'code-review': {
|
|
name: 'Code Review',
|
|
description: 'Comprehensive code review with multiple agents',
|
|
steps: [
|
|
{
|
|
id: 'analyze',
|
|
type: 'agent',
|
|
agentType: 'analyst',
|
|
prompt: 'Analyze the code structure and identify key components: {{input}}',
|
|
},
|
|
{
|
|
id: 'review-quality',
|
|
type: 'agent',
|
|
agentType: 'reviewer',
|
|
prompt: 'Review code quality, best practices, and potential issues based on analysis: {{analyze.output}}',
|
|
dependsOn: ['analyze'],
|
|
},
|
|
{
|
|
id: 'review-security',
|
|
type: 'agent',
|
|
agentType: 'reviewer',
|
|
prompt: 'Review security vulnerabilities and concerns: {{input}}',
|
|
},
|
|
{
|
|
id: 'suggestions',
|
|
type: 'agent',
|
|
agentType: 'coder',
|
|
prompt: 'Provide specific code improvement suggestions based on reviews:\nQuality: {{review-quality.output}}\nSecurity: {{review-security.output}}',
|
|
dependsOn: ['review-quality', 'review-security'],
|
|
},
|
|
],
|
|
},
|
|
|
|
'feature-dev': {
|
|
name: 'Feature Development',
|
|
description: 'End-to-end feature development workflow',
|
|
steps: [
|
|
{
|
|
id: 'research',
|
|
type: 'agent',
|
|
agentType: 'researcher',
|
|
prompt: 'Research requirements and best practices for: {{input}}',
|
|
},
|
|
{
|
|
id: 'design',
|
|
type: 'agent',
|
|
agentType: 'analyst',
|
|
prompt: 'Design the architecture and approach based on research: {{research.output}}',
|
|
dependsOn: ['research'],
|
|
},
|
|
{
|
|
id: 'implement',
|
|
type: 'agent',
|
|
agentType: 'coder',
|
|
prompt: 'Implement the feature based on design: {{design.output}}',
|
|
dependsOn: ['design'],
|
|
},
|
|
{
|
|
id: 'test',
|
|
type: 'agent',
|
|
agentType: 'tester',
|
|
prompt: 'Write tests for the implementation: {{implement.output}}',
|
|
dependsOn: ['implement'],
|
|
},
|
|
{
|
|
id: 'review',
|
|
type: 'agent',
|
|
agentType: 'reviewer',
|
|
prompt: 'Final review of implementation and tests:\nCode: {{implement.output}}\nTests: {{test.output}}',
|
|
dependsOn: ['implement', 'test'],
|
|
},
|
|
],
|
|
},
|
|
|
|
'bug-fix': {
|
|
name: 'Bug Fix',
|
|
description: 'Systematic bug investigation and fix workflow',
|
|
steps: [
|
|
{
|
|
id: 'investigate',
|
|
type: 'agent',
|
|
agentType: 'analyst',
|
|
prompt: 'Investigate the bug and identify root cause: {{input}}',
|
|
},
|
|
{
|
|
id: 'fix',
|
|
type: 'agent',
|
|
agentType: 'coder',
|
|
prompt: 'Implement the fix for: {{investigate.output}}',
|
|
dependsOn: ['investigate'],
|
|
},
|
|
{
|
|
id: 'test',
|
|
type: 'agent',
|
|
agentType: 'tester',
|
|
prompt: 'Write regression tests to prevent recurrence: {{fix.output}}',
|
|
dependsOn: ['fix'],
|
|
},
|
|
{
|
|
id: 'verify',
|
|
type: 'agent',
|
|
agentType: 'reviewer',
|
|
prompt: 'Verify the fix is complete and correct:\nFix: {{fix.output}}\nTests: {{test.output}}',
|
|
dependsOn: ['fix', 'test'],
|
|
},
|
|
],
|
|
},
|
|
|
|
'optimization': {
|
|
name: 'Performance Optimization',
|
|
description: 'Performance analysis and optimization workflow',
|
|
steps: [
|
|
{
|
|
id: 'profile',
|
|
type: 'agent',
|
|
agentType: 'optimizer',
|
|
prompt: 'Profile and identify performance bottlenecks: {{input}}',
|
|
},
|
|
{
|
|
id: 'analyze',
|
|
type: 'agent',
|
|
agentType: 'analyst',
|
|
prompt: 'Analyze profiling results and prioritize optimizations: {{profile.output}}',
|
|
dependsOn: ['profile'],
|
|
},
|
|
{
|
|
id: 'optimize',
|
|
type: 'agent',
|
|
agentType: 'coder',
|
|
prompt: 'Implement optimizations based on analysis: {{analyze.output}}',
|
|
dependsOn: ['analyze'],
|
|
},
|
|
{
|
|
id: 'benchmark',
|
|
type: 'agent',
|
|
agentType: 'tester',
|
|
prompt: 'Benchmark optimized code and compare: {{optimize.output}}',
|
|
dependsOn: ['optimize'],
|
|
},
|
|
],
|
|
},
|
|
|
|
'research': {
|
|
name: 'Research',
|
|
description: 'Deep research and analysis workflow',
|
|
steps: [
|
|
{
|
|
id: 'gather',
|
|
type: 'agent',
|
|
agentType: 'researcher',
|
|
prompt: 'Gather information and sources on: {{input}}',
|
|
},
|
|
{
|
|
id: 'analyze',
|
|
type: 'agent',
|
|
agentType: 'analyst',
|
|
prompt: 'Analyze gathered information: {{gather.output}}',
|
|
dependsOn: ['gather'],
|
|
},
|
|
{
|
|
id: 'synthesize',
|
|
type: 'agent',
|
|
agentType: 'researcher',
|
|
prompt: 'Synthesize findings into actionable insights: {{analyze.output}}',
|
|
dependsOn: ['analyze'],
|
|
},
|
|
],
|
|
},
|
|
};
|
|
|
|
// ============================================
|
|
// WORKFLOW STEP
|
|
// ============================================
|
|
|
|
class WorkflowStep {
|
|
constructor(config) {
|
|
this.id = config.id;
|
|
this.type = config.type || StepTypes.AGENT;
|
|
this.agentType = config.agentType;
|
|
this.prompt = config.prompt;
|
|
this.dependsOn = config.dependsOn || [];
|
|
this.options = config.options || {};
|
|
this.subSteps = config.subSteps || [];
|
|
this.condition = config.condition;
|
|
this.transform = config.transform;
|
|
|
|
this.status = 'pending';
|
|
this.output = null;
|
|
this.error = null;
|
|
this.startTime = null;
|
|
this.endTime = null;
|
|
}
|
|
|
|
/**
|
|
* Interpolate template variables
|
|
*/
|
|
interpolate(template, context) {
|
|
return template.replace(/\{\{(\w+(?:\.\w+)?)\}\}/g, (match, path) => {
|
|
const parts = path.split('.');
|
|
let value = context;
|
|
|
|
for (const part of parts) {
|
|
if (value && typeof value === 'object') {
|
|
value = value[part];
|
|
} else {
|
|
return match; // Keep original if not found
|
|
}
|
|
}
|
|
|
|
if (typeof value === 'object') {
|
|
return JSON.stringify(value, null, 2);
|
|
}
|
|
|
|
return value !== undefined ? String(value) : match;
|
|
});
|
|
}
|
|
|
|
getInfo() {
|
|
return {
|
|
id: this.id,
|
|
type: this.type,
|
|
status: this.status,
|
|
duration: this.endTime && this.startTime ? this.endTime - this.startTime : null,
|
|
dependsOn: this.dependsOn,
|
|
hasOutput: this.output !== null,
|
|
error: this.error,
|
|
};
|
|
}
|
|
}
|
|
|
|
// ============================================
|
|
// REAL WORKFLOW ORCHESTRATOR
|
|
// ============================================
|
|
|
|
/**
|
|
* Real workflow orchestrator with actual LLM execution
|
|
*/
|
|
export class RealWorkflowOrchestrator extends EventEmitter {
|
|
constructor(options = {}) {
|
|
super();
|
|
this.agentManager = null;
|
|
this.workerPool = null;
|
|
this.workflows = new Map();
|
|
this.options = options;
|
|
|
|
this.stats = {
|
|
workflowsCompleted: 0,
|
|
workflowsFailed: 0,
|
|
stepsExecuted: 0,
|
|
totalDuration: 0,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Initialize orchestrator
|
|
*/
|
|
async initialize() {
|
|
// Initialize agent manager for LLM execution
|
|
this.agentManager = new RealAgentManager({
|
|
provider: this.options.provider || 'anthropic',
|
|
apiKey: this.options.apiKey,
|
|
});
|
|
await this.agentManager.initialize();
|
|
|
|
// Initialize worker pool for compute tasks
|
|
this.workerPool = new RealWorkerPool({ size: 4 });
|
|
await this.workerPool.initialize();
|
|
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Create workflow from template or custom definition
|
|
*/
|
|
createWorkflow(nameOrConfig, customTask = null) {
|
|
let config;
|
|
|
|
if (typeof nameOrConfig === 'string') {
|
|
const template = WorkflowTemplates[nameOrConfig];
|
|
if (!template) {
|
|
throw new Error(`Unknown workflow template: ${nameOrConfig}`);
|
|
}
|
|
config = {
|
|
...template,
|
|
input: customTask,
|
|
};
|
|
} else {
|
|
config = nameOrConfig;
|
|
}
|
|
|
|
const workflow = {
|
|
id: `wf-${randomBytes(6).toString('hex')}`,
|
|
name: config.name,
|
|
description: config.description,
|
|
input: config.input,
|
|
steps: config.steps.map(s => new WorkflowStep(s)),
|
|
status: 'created',
|
|
results: {},
|
|
startTime: null,
|
|
endTime: null,
|
|
error: null,
|
|
};
|
|
|
|
this.workflows.set(workflow.id, workflow);
|
|
this.emit('workflow-created', { workflowId: workflow.id, name: workflow.name });
|
|
|
|
return workflow;
|
|
}
|
|
|
|
/**
|
|
* Execute a workflow
|
|
*/
|
|
async executeWorkflow(workflowId) {
|
|
const workflow = this.workflows.get(workflowId);
|
|
if (!workflow) {
|
|
throw new Error(`Workflow not found: ${workflowId}`);
|
|
}
|
|
|
|
workflow.status = 'running';
|
|
workflow.startTime = Date.now();
|
|
workflow.results = { input: workflow.input };
|
|
|
|
this.emit('workflow-start', { workflowId, name: workflow.name });
|
|
|
|
try {
|
|
// Build dependency graph
|
|
const graph = this.buildDependencyGraph(workflow.steps);
|
|
|
|
// Execute steps respecting dependencies
|
|
await this.executeSteps(workflow, graph);
|
|
|
|
workflow.status = 'completed';
|
|
workflow.endTime = Date.now();
|
|
|
|
const duration = workflow.endTime - workflow.startTime;
|
|
this.stats.workflowsCompleted++;
|
|
this.stats.totalDuration += duration;
|
|
|
|
this.emit('workflow-complete', {
|
|
workflowId,
|
|
duration,
|
|
results: workflow.results,
|
|
});
|
|
|
|
return {
|
|
workflowId,
|
|
status: 'completed',
|
|
duration,
|
|
results: workflow.results,
|
|
steps: workflow.steps.map(s => s.getInfo()),
|
|
};
|
|
|
|
} catch (error) {
|
|
workflow.status = 'failed';
|
|
workflow.error = error.message;
|
|
workflow.endTime = Date.now();
|
|
|
|
this.stats.workflowsFailed++;
|
|
|
|
this.emit('workflow-error', { workflowId, error: error.message });
|
|
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Build dependency graph
|
|
*/
|
|
buildDependencyGraph(steps) {
|
|
const graph = new Map();
|
|
const stepMap = new Map();
|
|
|
|
for (const step of steps) {
|
|
stepMap.set(step.id, step);
|
|
graph.set(step.id, new Set(step.dependsOn));
|
|
}
|
|
|
|
return { graph, stepMap };
|
|
}
|
|
|
|
/**
|
|
* Execute steps respecting dependencies
|
|
*/
|
|
async executeSteps(workflow, { graph, stepMap }) {
|
|
const completed = new Set();
|
|
const running = new Map();
|
|
|
|
const isReady = (stepId) => {
|
|
const deps = graph.get(stepId);
|
|
return [...deps].every(d => completed.has(d));
|
|
};
|
|
|
|
const getReadySteps = () => {
|
|
const ready = [];
|
|
for (const [stepId, deps] of graph) {
|
|
if (!completed.has(stepId) && !running.has(stepId) && isReady(stepId)) {
|
|
ready.push(stepMap.get(stepId));
|
|
}
|
|
}
|
|
return ready;
|
|
};
|
|
|
|
while (completed.size < stepMap.size) {
|
|
const readySteps = getReadySteps();
|
|
|
|
if (readySteps.length === 0 && running.size === 0) {
|
|
throw new Error('Workflow deadlock: no steps ready and none running');
|
|
}
|
|
|
|
// Execute ready steps in parallel
|
|
for (const step of readySteps) {
|
|
const promise = this.executeStep(step, workflow.results)
|
|
.then(result => {
|
|
workflow.results[step.id] = { output: result };
|
|
completed.add(step.id);
|
|
running.delete(step.id);
|
|
this.stats.stepsExecuted++;
|
|
})
|
|
.catch(error => {
|
|
step.error = error.message;
|
|
step.status = 'failed';
|
|
throw error;
|
|
});
|
|
|
|
running.set(step.id, promise);
|
|
}
|
|
|
|
// Wait for at least one to complete
|
|
if (running.size > 0) {
|
|
await Promise.race(running.values());
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute a single step
|
|
*/
|
|
async executeStep(step, context) {
|
|
step.status = 'running';
|
|
step.startTime = Date.now();
|
|
|
|
this.emit('step-start', { stepId: step.id, type: step.type });
|
|
|
|
try {
|
|
let result;
|
|
|
|
switch (step.type) {
|
|
case StepTypes.AGENT:
|
|
result = await this.executeAgentStep(step, context);
|
|
break;
|
|
|
|
case StepTypes.WORKER:
|
|
result = await this.executeWorkerStep(step, context);
|
|
break;
|
|
|
|
case StepTypes.PARALLEL:
|
|
result = await this.executeParallelStep(step, context);
|
|
break;
|
|
|
|
case StepTypes.SEQUENTIAL:
|
|
result = await this.executeSequentialStep(step, context);
|
|
break;
|
|
|
|
case StepTypes.TRANSFORM:
|
|
result = await this.executeTransformStep(step, context);
|
|
break;
|
|
|
|
case StepTypes.CONDITION:
|
|
result = await this.executeConditionStep(step, context);
|
|
break;
|
|
|
|
case StepTypes.AGGREGATE:
|
|
result = await this.executeAggregateStep(step, context);
|
|
break;
|
|
|
|
default:
|
|
throw new Error(`Unknown step type: ${step.type}`);
|
|
}
|
|
|
|
step.output = result;
|
|
step.status = 'completed';
|
|
step.endTime = Date.now();
|
|
|
|
this.emit('step-complete', {
|
|
stepId: step.id,
|
|
duration: step.endTime - step.startTime,
|
|
});
|
|
|
|
return result;
|
|
|
|
} catch (error) {
|
|
step.status = 'failed';
|
|
step.error = error.message;
|
|
step.endTime = Date.now();
|
|
|
|
this.emit('step-error', { stepId: step.id, error: error.message });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute agent step with real LLM
|
|
*/
|
|
async executeAgentStep(step, context) {
|
|
const prompt = step.interpolate(step.prompt, context);
|
|
|
|
const result = await this.agentManager.quickExecute(
|
|
step.agentType || 'coder',
|
|
prompt,
|
|
{
|
|
model: step.options.model || 'balanced',
|
|
...step.options,
|
|
}
|
|
);
|
|
|
|
return result.content;
|
|
}
|
|
|
|
/**
|
|
* Execute worker step
|
|
*/
|
|
async executeWorkerStep(step, context) {
|
|
const data = step.interpolate(
|
|
JSON.stringify(step.options.data || context.input),
|
|
context
|
|
);
|
|
|
|
return this.workerPool.execute(
|
|
step.options.taskType || 'process',
|
|
JSON.parse(data),
|
|
step.options
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Execute parallel sub-steps
|
|
*/
|
|
async executeParallelStep(step, context) {
|
|
const subSteps = step.subSteps.map(s => new WorkflowStep(s));
|
|
const promises = subSteps.map(s => this.executeStep(s, context));
|
|
const results = await Promise.all(promises);
|
|
|
|
return results.reduce((acc, result, i) => {
|
|
acc[subSteps[i].id] = result;
|
|
return acc;
|
|
}, {});
|
|
}
|
|
|
|
/**
|
|
* Execute sequential sub-steps
|
|
*/
|
|
async executeSequentialStep(step, context) {
|
|
const subSteps = step.subSteps.map(s => new WorkflowStep(s));
|
|
const results = {};
|
|
|
|
for (const subStep of subSteps) {
|
|
results[subStep.id] = await this.executeStep(subStep, { ...context, ...results });
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
/**
|
|
* Execute transform step
|
|
*/
|
|
async executeTransformStep(step, context) {
|
|
const inputKey = step.options.input || 'input';
|
|
const input = context[inputKey]?.output || context[inputKey] || context.input;
|
|
|
|
if (step.transform) {
|
|
// Custom transform function as string
|
|
const fn = new Function('input', 'context', step.transform);
|
|
return fn(input, context);
|
|
}
|
|
|
|
// Default transforms
|
|
const transformType = step.options.transformType || 'identity';
|
|
switch (transformType) {
|
|
case 'json':
|
|
return JSON.parse(input);
|
|
case 'stringify':
|
|
return JSON.stringify(input);
|
|
case 'extract':
|
|
return input[step.options.key];
|
|
default:
|
|
return input;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute condition step
|
|
*/
|
|
async executeConditionStep(step, context) {
|
|
const condition = step.interpolate(step.condition, context);
|
|
|
|
// Evaluate condition
|
|
const fn = new Function('context', `return ${condition}`);
|
|
const result = fn(context);
|
|
|
|
if (result && step.options.then) {
|
|
const thenStep = new WorkflowStep(step.options.then);
|
|
return this.executeStep(thenStep, context);
|
|
} else if (!result && step.options.else) {
|
|
const elseStep = new WorkflowStep(step.options.else);
|
|
return this.executeStep(elseStep, context);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Execute aggregate step
|
|
*/
|
|
async executeAggregateStep(step, context) {
|
|
const keys = step.options.keys || Object.keys(context).filter(k => k !== 'input');
|
|
const aggregated = {};
|
|
|
|
for (const key of keys) {
|
|
if (context[key]) {
|
|
aggregated[key] = context[key].output || context[key];
|
|
}
|
|
}
|
|
|
|
if (step.options.format === 'summary') {
|
|
return Object.entries(aggregated)
|
|
.map(([k, v]) => `## ${k}\n${typeof v === 'string' ? v : JSON.stringify(v, null, 2)}`)
|
|
.join('\n\n');
|
|
}
|
|
|
|
return aggregated;
|
|
}
|
|
|
|
/**
|
|
* Run workflow by template name
|
|
*/
|
|
async run(templateName, input, options = {}) {
|
|
const workflow = this.createWorkflow(templateName, input);
|
|
return this.executeWorkflow(workflow.id);
|
|
}
|
|
|
|
/**
|
|
* Run custom workflow
|
|
*/
|
|
async runCustom(config) {
|
|
const workflow = this.createWorkflow(config);
|
|
return this.executeWorkflow(workflow.id);
|
|
}
|
|
|
|
/**
|
|
* Get workflow status
|
|
*/
|
|
getWorkflow(workflowId) {
|
|
const workflow = this.workflows.get(workflowId);
|
|
if (!workflow) return null;
|
|
|
|
return {
|
|
id: workflow.id,
|
|
name: workflow.name,
|
|
status: workflow.status,
|
|
steps: workflow.steps.map(s => s.getInfo()),
|
|
duration: workflow.endTime && workflow.startTime
|
|
? workflow.endTime - workflow.startTime
|
|
: null,
|
|
error: workflow.error,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get orchestrator stats
|
|
*/
|
|
getStats() {
|
|
return {
|
|
...this.stats,
|
|
activeWorkflows: [...this.workflows.values()]
|
|
.filter(w => w.status === 'running').length,
|
|
agentManager: this.agentManager?.listAgents()?.length || 0,
|
|
workerPool: this.workerPool?.getStatus(),
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Shutdown orchestrator
|
|
*/
|
|
async shutdown() {
|
|
if (this.agentManager) {
|
|
await this.agentManager.close();
|
|
}
|
|
if (this.workerPool) {
|
|
await this.workerPool.shutdown();
|
|
}
|
|
}
|
|
|
|
// Alias for shutdown
|
|
async close() {
|
|
return this.shutdown();
|
|
}
|
|
}
|
|
|
|
// Export WorkflowStep (not exported with export class)
|
|
export { WorkflowStep };
|
|
|
|
// Default export
|
|
export default RealWorkflowOrchestrator;
|