# PostgreSQL Integration Specification ## Overview This document specifies how the Neural DAG system integrates with PostgreSQL via the pgrx framework, including type definitions, operators, functions, and background workers. ## Module Structure ``` crates/ruvector-postgres/src/dag/ ├── mod.rs # Module root ├── operators.rs # SQL function definitions ├── types.rs # PostgreSQL type mappings ├── hooks.rs # Query execution hooks ├── worker.rs # Background worker ├── gucs.rs # GUC variable definitions └── state.rs # Per-connection state ``` ## GUC Variables ### Definition ```rust // crates/ruvector-postgres/src/dag/gucs.rs use pgrx::prelude::*; use std::ffi::CStr; // Global enable/disable static NEURAL_DAG_ENABLED: GucSetting = GucSetting::new(true); // Learning parameters static DAG_LEARNING_RATE: GucSetting = GucSetting::new(0.002); static DAG_PATTERN_CLUSTERS: GucSetting = GucSetting::new(100); static DAG_QUALITY_THRESHOLD: GucSetting = GucSetting::new(0.3); static DAG_MAX_TRAJECTORIES: GucSetting = GucSetting::new(10000); // Attention parameters static DAG_ATTENTION_TYPE: GucSetting<&'static CStr> = GucSetting::new(unsafe { CStr::from_bytes_with_nul_unchecked(b"auto\0") }); static DAG_ATTENTION_EXPLORATION: GucSetting = GucSetting::new(0.1); // EWC parameters static DAG_EWC_LAMBDA: GucSetting = GucSetting::new(2000.0); static DAG_EWC_MAX_LAMBDA: GucSetting = GucSetting::new(15000.0); // MinCut parameters static DAG_MINCUT_ENABLED: GucSetting = GucSetting::new(true); static DAG_MINCUT_THRESHOLD: GucSetting = GucSetting::new(0.5); // Background worker parameters static DAG_BACKGROUND_INTERVAL_MS: GucSetting = GucSetting::new(3600000); pub fn register_gucs() { GucRegistry::define_bool_guc( "ruvector.neural_dag_enabled", "Enable neural DAG optimization", "When enabled, queries are optimized using learned patterns", &NEURAL_DAG_ENABLED, GucContext::Userset, GucFlags::default(), ); GucRegistry::define_float_guc( "ruvector.dag_learning_rate", "Learning rate for DAG optimization", "Controls how fast the system adapts to new patterns", &DAG_LEARNING_RATE, 0.0001, 1.0, GucContext::Userset, GucFlags::default(), ); GucRegistry::define_int_guc( "ruvector.dag_pattern_clusters", "Number of pattern clusters", "K-means clusters for pattern extraction", &DAG_PATTERN_CLUSTERS, 10, 1000, GucContext::Userset, GucFlags::default(), ); GucRegistry::define_float_guc( "ruvector.dag_quality_threshold", "Minimum quality for learning", "Trajectories below this quality are not learned", &DAG_QUALITY_THRESHOLD, 0.0, 1.0, GucContext::Userset, GucFlags::default(), ); GucRegistry::define_string_guc( "ruvector.dag_attention_type", "Default attention type", "Options: auto, topological, causal_cone, critical_path, mincut_gated, hierarchical_lorentz, parallel_branch, temporal_btsp", &DAG_ATTENTION_TYPE, GucContext::Userset, GucFlags::default(), ); GucRegistry::define_float_guc( "ruvector.dag_ewc_lambda", "EWC regularization strength", "Higher values prevent forgetting more aggressively", &DAG_EWC_LAMBDA, 100.0, 50000.0, GucContext::Userset, GucFlags::default(), ); GucRegistry::define_bool_guc( "ruvector.dag_mincut_enabled", "Enable MinCut analysis", "When enabled, bottleneck operators are identified", &DAG_MINCUT_ENABLED, GucContext::Userset, GucFlags::default(), ); GucRegistry::define_int_guc( "ruvector.dag_background_interval_ms", "Background learning interval (milliseconds)", "How often the background learning cycle runs", &DAG_BACKGROUND_INTERVAL_MS, 60000, // 1 minute minimum 86400000, // 24 hours maximum GucContext::Sighup, GucFlags::default(), ); } /// Get current configuration from GUCs pub fn get_config() -> DagSonaConfig { DagSonaConfig { enabled: NEURAL_DAG_ENABLED.get(), learning_rate: DAG_LEARNING_RATE.get() as f32, pattern_clusters: DAG_PATTERN_CLUSTERS.get() as usize, quality_threshold: DAG_QUALITY_THRESHOLD.get() as f32, max_trajectories: DAG_MAX_TRAJECTORIES.get() as usize, attention_type: parse_attention_type(DAG_ATTENTION_TYPE.get()), attention_exploration: DAG_ATTENTION_EXPLORATION.get() as f32, ewc_lambda: DAG_EWC_LAMBDA.get() as f32, ewc_max_lambda: DAG_EWC_MAX_LAMBDA.get() as f32, mincut_enabled: DAG_MINCUT_ENABLED.get(), mincut_threshold: DAG_MINCUT_THRESHOLD.get() as f32, background_interval_ms: DAG_BACKGROUND_INTERVAL_MS.get() as u64, ..Default::default() } } ``` ## State Management ### Global State ```rust // crates/ruvector-postgres/src/dag/state.rs use once_cell::sync::Lazy; use dashmap::DashMap; use std::sync::Arc; /// Global registry of DAG engines per table static DAG_ENGINES: Lazy>> = Lazy::new(|| DashMap::new()); /// Get or create DAG engine for a table pub fn get_or_create_engine(table_name: &str) -> Arc { DAG_ENGINES.entry(table_name.to_string()) .or_insert_with(|| { let config = gucs::get_config(); Arc::new(DagSonaEngine::new(table_name, config).unwrap()) }) .clone() } /// Check if neural DAG is enabled for a table pub fn is_enabled(table_name: &str) -> bool { DAG_ENGINES.contains_key(table_name) && gucs::get_config().enabled } /// Remove engine (on DROP TABLE or disable) pub fn remove_engine(table_name: &str) { DAG_ENGINES.remove(table_name); } /// Get all engine names (for monitoring) pub fn list_engines() -> Vec { DAG_ENGINES.iter().map(|e| e.key().clone()).collect() } ``` ### Per-Connection State ```rust /// Per-connection state for query tracking pub struct ConnectionState { /// Current query trajectory being built current_trajectory: Option, /// Query start time query_start: Option, /// Current table being queried current_table: Option, } thread_local! { static CONNECTION_STATE: RefCell = RefCell::new(ConnectionState { current_trajectory: None, query_start: None, current_table: None, }); } impl ConnectionState { pub fn start_query(&mut self, table_name: &str) { self.query_start = Some(Instant::now()); self.current_table = Some(table_name.to_string()); self.current_trajectory = Some(TrajectoryBuilder::new()); } pub fn end_query(&mut self) -> Option { let builder = self.current_trajectory.take()?; let start = self.query_start.take()?; let _table = self.current_table.take()?; Some(builder.build(start.elapsed())) } } ``` ## SQL Function Implementations ### Enable/Disable Functions ```rust // crates/ruvector-postgres/src/dag/operators.rs use pgrx::prelude::*; /// Enable neural DAG learning for a table #[pg_extern] fn ruvector_enable_neural_dag( table_name: &str, config: Option, ) -> bool { let base_config = gucs::get_config(); let config = if let Some(pgrx::JsonB(json)) = config { // Merge JSON config with base config merge_config(base_config, &json) } else { base_config }; // Validate table exists let table_oid = get_table_oid(table_name); if table_oid.is_none() { ereport!( PgLogLevel::ERROR, PgSqlErrorCode::ERRCODE_UNDEFINED_TABLE, format!("Table '{}' does not exist", table_name) ); return false; } // Create engine let engine = DagSonaEngine::new(table_name, config) .map_err(|e| { ereport!( PgLogLevel::ERROR, PgSqlErrorCode::ERRCODE_INTERNAL_ERROR, format!("Failed to create DAG engine: {}", e) ); }) .unwrap(); // Register engine DAG_ENGINES.insert(table_name.to_string(), Arc::new(engine)); // Start background worker if not running start_background_worker_if_needed(); true } /// Disable neural DAG learning for a table #[pg_extern] fn ruvector_disable_neural_dag(table_name: &str) -> bool { if DAG_ENGINES.remove(table_name).is_some() { true } else { warning!("Neural DAG was not enabled for table '{}'", table_name); false } } /// Check if neural DAG is enabled #[pg_extern] fn ruvector_neural_dag_enabled(table_name: &str) -> bool { state::is_enabled(table_name) } ``` ### Pattern Functions ```rust /// Get learned patterns for a table #[pg_extern] fn ruvector_dag_patterns( table_name: &str, ) -> TableIterator<'static, ( name!(pattern_id, i64), name!(centroid, Vec), name!(attention_type, String), name!(confidence, f32), name!(sample_count, i32), name!(avg_latency_us, f64), name!(avg_quality, f64), )> { let engine = match DAG_ENGINES.get(table_name) { Some(e) => e.clone(), None => { warning!("Neural DAG not enabled for table '{}'", table_name); return TableIterator::new(vec![].into_iter()); } }; let patterns: Vec<_> = { let bank = engine.dag_reasoning_bank.read(); bank.patterns.iter() .map(|entry| { let p = entry.value(); ( p.id as i64, p.centroid.clone(), format!("{:?}", p.optimal_attention), p.confidence, p.sample_count as i32, p.avg_metrics.latency_us, p.avg_metrics.quality, ) }) .collect() }; TableIterator::new(patterns.into_iter()) } /// Force pattern extraction #[pg_extern] fn ruvector_dag_extract_patterns( table_name: &str, num_clusters: Option, ) -> i32 { let engine = match DAG_ENGINES.get(table_name) { Some(e) => e.clone(), None => { ereport!( PgLogLevel::ERROR, PgSqlErrorCode::ERRCODE_INVALID_PARAMETER_VALUE, format!("Neural DAG not enabled for table '{}'", table_name) ); return 0; } }; // Override cluster count if specified if let Some(k) = num_clusters { // Temporary config override } match engine.run_background_cycle() { Ok(BackgroundCycleResult::Completed { patterns_extracted, .. }) => { patterns_extracted as i32 } Ok(BackgroundCycleResult::Skipped { reason, .. }) => { warning!("Pattern extraction skipped: {}", reason); 0 } Err(e) => { ereport!( PgLogLevel::ERROR, PgSqlErrorCode::ERRCODE_INTERNAL_ERROR, format!("Pattern extraction failed: {}", e) ); 0 } } } /// Consolidate similar patterns #[pg_extern] fn ruvector_dag_consolidate_patterns( table_name: &str, similarity_threshold: Option, ) -> i32 { let engine = match DAG_ENGINES.get(table_name) { Some(e) => e.clone(), None => { warning!("Neural DAG not enabled for table '{}'", table_name); return 0; } }; let threshold = similarity_threshold.unwrap_or(0.95); let before_count = { let bank = engine.dag_reasoning_bank.read(); bank.patterns.len() }; { let mut bank = engine.dag_reasoning_bank.write(); bank.consolidate(threshold); } let after_count = { let bank = engine.dag_reasoning_bank.read(); bank.patterns.len() }; (before_count - after_count) as i32 } ``` ### Attention Functions ```rust /// Compute topological attention #[pg_extern(immutable, parallel_safe)] fn ruvector_attention_topological( query: Vec, ancestors: Vec>, config: Option, ) -> Vec { let config = parse_attention_config(config); let attention = TopologicalAttention::new(config); let ctx = build_simple_context(&ancestors); let query_node = DagNode::from_embedding(query); match attention.forward(&query_node, &ctx, &config) { Ok(output) => output.weights, Err(e) => { warning!("Attention computation failed: {}", e); vec![1.0 / ancestors.len() as f32; ancestors.len()] // Uniform fallback } } } /// Compute causal cone attention #[pg_extern(immutable, parallel_safe)] fn ruvector_attention_causal_cone( query: Vec, ancestors: Vec>, depths: Vec, decay_rate: f32, max_depth: i32, ) -> Vec { let config = AttentionConfig { hidden_dim: query.len(), ..Default::default() }; let attention = CausalConeAttention::new( config.hidden_dim, 8, // num_heads decay_rate, max_depth as usize, ); let ctx = build_context_with_depths(&ancestors, &depths); let query_node = DagNode::from_embedding(query); match attention.forward(&query_node, &ctx, &config) { Ok(output) => output.weights, Err(e) => { warning!("Causal cone attention failed: {}", e); vec![1.0 / ancestors.len() as f32; ancestors.len()] } } } /// Compute critical path attention #[pg_extern(immutable, parallel_safe)] fn ruvector_attention_critical_path( query: Vec, ancestors: Vec>, is_critical: Vec, boost: f32, ) -> Vec { let config = AttentionConfig { hidden_dim: query.len(), ..Default::default() }; let attention = CriticalPathAttention::new( config.hidden_dim, 8, boost, ); let ctx = build_context_with_critical(&ancestors, &is_critical); let query_node = DagNode::from_embedding(query); match attention.forward(&query_node, &ctx, &config) { Ok(output) => output.weights, Err(e) => { warning!("Critical path attention failed: {}", e); vec![1.0 / ancestors.len() as f32; ancestors.len()] } } } /// Compute mincut gated attention #[pg_extern(immutable, parallel_safe)] fn ruvector_attention_mincut_gated( query: Vec, ancestors: Vec>, criticalities: Vec, threshold: f32, ) -> Vec { let config = AttentionConfig { hidden_dim: query.len(), ..Default::default() }; let attention = MinCutGatedAttention::new( config.hidden_dim, 8, threshold, ); let ctx = build_context_with_criticalities(&ancestors, &criticalities); let query_node = DagNode::from_embedding(query); match attention.forward(&query_node, &ctx, &config) { Ok(output) => output.weights, Err(e) => { warning!("MinCut gated attention failed: {}", e); vec![1.0 / ancestors.len() as f32; ancestors.len()] } } } /// Compute hierarchical Lorentz attention #[pg_extern(immutable, parallel_safe)] fn ruvector_attention_hierarchical_lorentz( query: Vec, ancestors: Vec>, depths: Vec, curvature: f32, temperature: f32, ) -> Vec { let config = AttentionConfig { hidden_dim: query.len(), ..Default::default() }; let attention = HierarchicalLorentzAttention::new( config.hidden_dim, 8, curvature, temperature, ); let ctx = build_context_with_depths(&ancestors, &depths); let query_node = DagNode::from_embedding(query); match attention.forward(&query_node, &ctx, &config) { Ok(output) => output.weights, Err(e) => { warning!("Hierarchical Lorentz attention failed: {}", e); vec![1.0 / ancestors.len() as f32; ancestors.len()] } } } /// Compute parallel branch attention #[pg_extern(immutable, parallel_safe)] fn ruvector_attention_parallel_branch( query: Vec, ancestors: Vec>, parallel_nodes: Vec>, common_ancestor_mask: Vec, cross_weight: f32, common_boost: f32, ) -> Vec { let config = AttentionConfig { hidden_dim: query.len(), ..Default::default() }; let attention = ParallelBranchAttention::new( config.hidden_dim, 8, cross_weight, common_boost, ); let ctx = build_context_with_parallel( &ancestors, ¶llel_nodes, &common_ancestor_mask, ); let query_node = DagNode::from_embedding(query); match attention.forward(&query_node, &ctx, &config) { Ok(output) => output.weights, Err(e) => { warning!("Parallel branch attention failed: {}", e); let total = ancestors.len() + parallel_nodes.len(); vec![1.0 / total as f32; total] } } } /// Compute temporal BTSP attention #[pg_extern(immutable, parallel_safe)] fn ruvector_attention_temporal_btsp( query: Vec, ancestors: Vec>, timestamps: Vec, window_ms: f32, boost: f32, ) -> Vec { let config = AttentionConfig { hidden_dim: query.len(), ..Default::default() }; let attention = TemporalBTSPAttention::new( config.hidden_dim, 8, window_ms, boost, ); let ctx = build_context_with_timestamps(&ancestors, ×tamps); let query_node = DagNode::from_embedding(query); match attention.forward(&query_node, &ctx, &config) { Ok(output) => output.weights, Err(e) => { warning!("Temporal BTSP attention failed: {}", e); vec![1.0 / ancestors.len() as f32; ancestors.len()] } } } /// Compute ensemble attention #[pg_extern(immutable, parallel_safe)] fn ruvector_attention_ensemble( query: Vec, ancestors: Vec>, attention_types: Vec, weights: Option>, ) -> Vec { let config = AttentionConfig { hidden_dim: query.len(), ..Default::default() }; let types: Vec = attention_types.iter() .filter_map(|s| parse_attention_type_str(s)) .collect(); let weights = weights.unwrap_or_else(|| { vec![1.0 / types.len() as f32; types.len()] }); let attention = EnsembleAttention::new(types, weights); let ctx = build_simple_context(&ancestors); let query_node = DagNode::from_embedding(query); match attention.forward(&query_node, &ctx, &config) { Ok(output) => output.weights, Err(e) => { warning!("Ensemble attention failed: {}", e); vec![1.0 / ancestors.len() as f32; ancestors.len()] } } } ``` ### MinCut Functions ```rust /// Compute mincut criticality for DAG nodes #[pg_extern] fn ruvector_dag_mincut_criticality( table_name: &str, ) -> TableIterator<'static, ( name!(node_id, i64), name!(criticality, f32), name!(is_bottleneck, bool), )> { let engine = match DAG_ENGINES.get(table_name) { Some(e) => e.clone(), None => { warning!("Neural DAG not enabled for table '{}'", table_name); return TableIterator::new(vec![].into_iter()); } }; let mincut = match &engine.mincut_engine { Some(m) => m.clone(), None => { warning!("MinCut not enabled for table '{}'", table_name); return TableIterator::new(vec![].into_iter()); } }; let threshold = gucs::get_config().mincut_threshold; // Compute criticalities let criticalities = mincut.compute_all_criticalities(); let results: Vec<_> = criticalities.into_iter() .map(|(node_id, crit)| { (node_id as i64, crit, crit > threshold) }) .collect(); TableIterator::new(results.into_iter()) } /// Analyze DAG bottlenecks #[pg_extern] fn ruvector_dag_mincut_analysis( table_name: &str, ) -> pgrx::JsonB { let engine = match DAG_ENGINES.get(table_name) { Some(e) => e.clone(), None => { return pgrx::JsonB(json!({ "error": "Neural DAG not enabled" })); } }; let mincut = match &engine.mincut_engine { Some(m) => m.clone(), None => { return pgrx::JsonB(json!({ "error": "MinCut not enabled" })); } }; let global_cut = mincut.query(); let criticalities = mincut.compute_all_criticalities(); let threshold = gucs::get_config().mincut_threshold; let bottlenecks: Vec<_> = criticalities.iter() .filter(|(_, c)| *c > threshold) .map(|(id, c)| json!({ "node_id": *id, "criticality": *c, })) .collect(); pgrx::JsonB(json!({ "global_mincut": global_cut, "bottleneck_count": bottlenecks.len(), "bottlenecks": bottlenecks, "threshold": threshold, })) } ``` ### Monitoring Functions ```rust /// Get learning statistics #[pg_extern] fn ruvector_dag_learning_stats(table_name: &str) -> pgrx::JsonB { let engine = match DAG_ENGINES.get(table_name) { Some(e) => e.clone(), None => { return pgrx::JsonB(json!({ "enabled": false, "error": "Neural DAG not enabled" })); } }; let metrics = engine.metrics.to_json(); let buffer_stats = engine.dag_trajectory_buffer.stats(); let pattern_count = { let bank = engine.dag_reasoning_bank.read(); bank.patterns.len() }; pgrx::JsonB(json!({ "enabled": true, "queries_processed": metrics["queries_processed"], "pattern_hit_rate": metrics["pattern_hit_rate"], "avg_latency_us": metrics["avg_latency_us"], "patterns_stored": pattern_count, "trajectories_buffered": buffer_stats.current_size, "trajectories_dropped": buffer_stats.dropped, "background_cycles": metrics["background_cycles"], "ewc_tasks": metrics["ewc_tasks"], })) } /// Get health report #[pg_extern] fn ruvector_dag_health_report(table_name: &str) -> pgrx::JsonB { let engine = match DAG_ENGINES.get(table_name) { Some(e) => e.clone(), None => { return pgrx::JsonB(json!({ "healthy": false, "error": "Neural DAG not enabled" })); } }; let metrics = engine.metrics.to_json(); let buffer_stats = engine.dag_trajectory_buffer.stats(); // Health checks let mut issues = Vec::new(); // Check drop rate let drop_rate = if buffer_stats.total_seen > 0 { buffer_stats.dropped as f64 / buffer_stats.total_seen as f64 } else { 0.0 }; if drop_rate > 0.1 { issues.push(format!("High trajectory drop rate: {:.1}%", drop_rate * 100.0)); } // Check pattern hit rate let hit_rate = metrics["pattern_hit_rate"].as_f64().unwrap_or(0.0); if hit_rate < 0.1 && metrics["queries_processed"].as_u64().unwrap_or(0) > 1000 { issues.push(format!("Low pattern hit rate: {:.1}%", hit_rate * 100.0)); } // Check MinCut bottlenecks if let Some(ref mincut) = engine.mincut_engine { let criticalities = mincut.compute_all_criticalities(); let severe_bottlenecks = criticalities.values() .filter(|&&c| c > 0.8) .count(); if severe_bottlenecks > 0 { issues.push(format!("{} severe bottlenecks detected", severe_bottlenecks)); } } pgrx::JsonB(json!({ "healthy": issues.is_empty(), "issues": issues, "metrics": metrics, "buffer": { "size": buffer_stats.current_size, "capacity": buffer_stats.capacity, "drop_rate": drop_rate, }, })) } /// Force a learning cycle #[pg_extern] fn ruvector_dag_learn(table_name: &str) -> pgrx::JsonB { let engine = match DAG_ENGINES.get(table_name) { Some(e) => e.clone(), None => { return pgrx::JsonB(json!({ "success": false, "error": "Neural DAG not enabled" })); } }; match engine.run_background_cycle() { Ok(BackgroundCycleResult::Completed { trajectories_processed, patterns_extracted, duration, }) => { pgrx::JsonB(json!({ "success": true, "trajectories_processed": trajectories_processed, "patterns_extracted": patterns_extracted, "duration_ms": duration.as_millis(), })) } Ok(BackgroundCycleResult::Skipped { reason, count }) => { pgrx::JsonB(json!({ "success": false, "skipped": true, "reason": reason, "trajectory_count": count, })) } Err(e) => { pgrx::JsonB(json!({ "success": false, "error": e.to_string(), })) } } } ``` ## Background Worker ```rust // crates/ruvector-postgres/src/dag/worker.rs use pgrx::bgworkers::*; use std::time::Duration; #[pg_guard] pub extern "C" fn dag_background_worker_main(_arg: pg_sys::Datum) { BackgroundWorker::attach_signal_handlers(SignalWakeFlags::SIGHUP | SignalWakeFlags::SIGTERM); log!( "DAG background worker started with interval {}ms", gucs::DAG_BACKGROUND_INTERVAL_MS.get() ); while BackgroundWorker::wait_latch(Some(Duration::from_millis( gucs::DAG_BACKGROUND_INTERVAL_MS.get() as u64 ))) { if BackgroundWorker::sighup_received() { // Reload configuration log!("DAG background worker received SIGHUP, reloading config"); } // Run learning cycle for all enabled tables for engine_ref in DAG_ENGINES.iter() { let table_name = engine_ref.key(); let engine = engine_ref.value(); match engine.run_background_cycle() { Ok(BackgroundCycleResult::Completed { patterns_extracted, .. }) => { log!( "DAG learning cycle completed for '{}': {} patterns", table_name, patterns_extracted ); } Ok(BackgroundCycleResult::Skipped { reason, .. }) => { // Normal - not enough data } Err(e) => { elog!( WARNING, "DAG learning cycle failed for '{}': {}", table_name, e ); } } } } log!("DAG background worker shutting down"); } /// Register the background worker pub fn register_background_worker() { BackgroundWorkerBuilder::new("ruvector_dag_worker") .set_function("dag_background_worker_main") .set_library("ruvector") .set_argument(0.into()) .enable_spi_access() .set_start_time(BgWorkerStartTime::RecoveryFinished) .set_restart_time(Some(Duration::from_secs(10))) .load(); } ``` ## Query Execution Hooks ```rust // crates/ruvector-postgres/src/dag/hooks.rs use pgrx::prelude::*; /// Hook into query execution for learning pub fn install_query_hooks() { // Install planner hook unsafe { prev_planner_hook = planner_hook; planner_hook = Some(dag_planner_hook); } // Install executor hooks unsafe { prev_ExecutorStart_hook = ExecutorStart_hook; ExecutorStart_hook = Some(dag_executor_start_hook); prev_ExecutorEnd_hook = ExecutorEnd_hook; ExecutorEnd_hook = Some(dag_executor_end_hook); } } /// Planner hook - optimize query plan #[pg_guard] unsafe extern "C" fn dag_planner_hook( parse: *mut pg_sys::Query, query_string: *const c_char, cursor_options: c_int, bound_params: *mut pg_sys::ParamListInfoData, ) -> *mut pg_sys::PlannedStmt { // Call original planner let stmt = if let Some(prev) = prev_planner_hook { prev(parse, query_string, cursor_options, bound_params) } else { pg_sys::standard_planner(parse, query_string, cursor_options, bound_params) }; // Check if neural DAG optimization applies if !gucs::NEURAL_DAG_ENABLED.get() { return stmt; } // Extract table name from query if let Some(table_name) = extract_table_name(parse) { if let Some(engine) = DAG_ENGINES.get(&table_name) { // Build neural plan wrapper let mut neural_plan = NeuralDagPlan::from_planned_stmt(stmt); // Apply learned optimizations let _ = engine.pre_query(&mut neural_plan); // Store for post-query processing CONNECTION_STATE.with(|state| { state.borrow_mut().start_query(&table_name); }); } } stmt } /// Executor start hook - record start time #[pg_guard] unsafe extern "C" fn dag_executor_start_hook( query_desc: *mut pg_sys::QueryDesc, eflags: c_int, ) { // Call original if let Some(prev) = prev_ExecutorStart_hook { prev(query_desc, eflags); } else { pg_sys::standard_ExecutorStart(query_desc, eflags); } // Record timing CONNECTION_STATE.with(|state| { if let Some(ref mut traj) = state.borrow_mut().current_trajectory { traj.mark_execution_start(); } }); } /// Executor end hook - record trajectory #[pg_guard] unsafe extern "C" fn dag_executor_end_hook(query_desc: *mut pg_sys::QueryDesc) { // Collect metrics before cleanup let metrics = extract_execution_metrics(query_desc); // Call original if let Some(prev) = prev_ExecutorEnd_hook { prev(query_desc); } else { pg_sys::standard_ExecutorEnd(query_desc); } // Record trajectory CONNECTION_STATE.with(|state| { let mut state = state.borrow_mut(); if let (Some(trajectory), Some(table_name)) = ( state.end_query(), state.current_table.as_ref(), ) { if let Some(engine) = DAG_ENGINES.get(table_name) { engine.post_query(&trajectory.plan, metrics); } } }); } fn extract_execution_metrics(query_desc: *mut pg_sys::QueryDesc) -> ExecutionMetrics { unsafe { let estate = (*query_desc).estate; ExecutionMetrics { latency_us: 0, // Computed from timestamps planning_us: 0, execution_us: 0, rows_processed: (*estate).es_processed as u64, memory_bytes: pg_sys::MemoryContextMemAllocated( (*estate).es_query_cxt, false, ) as u64, cache_hit_rate: 0.0, // Would need buffer stats max_criticality: None, } } } ``` ## Extension Initialization ```rust // crates/ruvector-postgres/src/lib.rs (additions) #[pg_guard] pub extern "C" fn _PG_init() { // ... existing initialization ... // Register DAG GUCs dag::gucs::register_gucs(); // Install query hooks dag::hooks::install_query_hooks(); // Register background worker dag::worker::register_background_worker(); pgrx::log!("RuVector Neural DAG module initialized"); } ``` ## SQL Installation Script ```sql -- Extension installation additions -- Create schema for DAG objects CREATE SCHEMA IF NOT EXISTS ruvector_dag; -- Pattern storage table (optional persistence) CREATE TABLE IF NOT EXISTS ruvector_dag.patterns ( id BIGSERIAL PRIMARY KEY, table_name TEXT NOT NULL, pattern_id BIGINT NOT NULL, centroid ruvector NOT NULL, attention_type TEXT NOT NULL, optimal_params JSONB NOT NULL, confidence FLOAT NOT NULL, sample_count INT NOT NULL, avg_latency_us FLOAT NOT NULL, avg_quality FLOAT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE (table_name, pattern_id) ); -- Trajectory history table (optional) CREATE TABLE IF NOT EXISTS ruvector_dag.trajectory_history ( id BIGSERIAL PRIMARY KEY, table_name TEXT NOT NULL, trajectory_id BIGINT NOT NULL, plan_embedding ruvector NOT NULL, attention_type TEXT NOT NULL, latency_us BIGINT NOT NULL, quality FLOAT NOT NULL, recorded_at TIMESTAMPTZ DEFAULT NOW() ); -- Index for pattern lookup CREATE INDEX IF NOT EXISTS idx_patterns_table ON ruvector_dag.patterns (table_name); -- Index for trajectory analysis CREATE INDEX IF NOT EXISTS idx_trajectories_table_time ON ruvector_dag.trajectory_history (table_name, recorded_at DESC); -- Cleanup old trajectories (run periodically) CREATE OR REPLACE FUNCTION ruvector_dag.cleanup_old_trajectories( retention_days INT DEFAULT 7 ) RETURNS INT AS $$ DECLARE deleted_count INT; BEGIN DELETE FROM ruvector_dag.trajectory_history WHERE recorded_at < NOW() - (retention_days || ' days')::INTERVAL; GET DIAGNOSTICS deleted_count = ROW_COUNT; RETURN deleted_count; END; $$ LANGUAGE plpgsql; ```