506 lines
15 KiB
Rust
506 lines
15 KiB
Rust
//! HTTP server for Cloud Run deployment
|
|
//!
|
|
//! Provides REST API endpoints for running benchmarks remotely.
|
|
|
|
use anyhow::Result;
|
|
use axum::{
|
|
extract::{Query, State},
|
|
http::StatusCode,
|
|
response::{IntoResponse, Json},
|
|
routing::{get, post},
|
|
Router,
|
|
};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use tokio::sync::Mutex;
|
|
|
|
use crate::benchmark::{self, BenchmarkResult, SystemInfo};
|
|
use crate::cuda::GpuInfo;
|
|
use crate::simd::SimdCapability;
|
|
|
|
/// Server state
|
|
#[derive(Clone)]
|
|
struct AppState {
|
|
results: Arc<Mutex<Vec<BenchmarkResult>>>,
|
|
running: Arc<Mutex<bool>>,
|
|
}
|
|
|
|
/// Health check response
|
|
#[derive(Serialize)]
|
|
struct HealthResponse {
|
|
status: &'static str,
|
|
version: &'static str,
|
|
gpu_available: bool,
|
|
gpu_name: Option<String>,
|
|
simd_capability: String,
|
|
uptime_secs: u64,
|
|
}
|
|
|
|
/// Benchmark request
|
|
#[derive(Deserialize)]
|
|
struct BenchmarkRequest {
|
|
#[serde(default = "default_dims")]
|
|
dims: usize,
|
|
#[serde(default = "default_num_vectors")]
|
|
num_vectors: usize,
|
|
#[serde(default = "default_num_queries")]
|
|
num_queries: usize,
|
|
#[serde(default = "default_k")]
|
|
k: usize,
|
|
#[serde(default)]
|
|
benchmark_type: String,
|
|
}
|
|
|
|
fn default_dims() -> usize {
|
|
128
|
|
}
|
|
fn default_num_vectors() -> usize {
|
|
10000
|
|
}
|
|
fn default_num_queries() -> usize {
|
|
1000
|
|
}
|
|
fn default_k() -> usize {
|
|
10
|
|
}
|
|
|
|
/// Benchmark response
|
|
#[derive(Serialize)]
|
|
struct BenchmarkResponse {
|
|
status: &'static str,
|
|
message: String,
|
|
result: Option<BenchmarkResult>,
|
|
error: Option<String>,
|
|
}
|
|
|
|
/// Run HTTP server for Cloud Run
|
|
pub async fn run_server(port: u16) -> Result<()> {
|
|
let state = AppState {
|
|
results: Arc::new(Mutex::new(Vec::new())),
|
|
running: Arc::new(Mutex::new(false)),
|
|
};
|
|
|
|
let app = Router::new()
|
|
.route("/", get(root_handler))
|
|
.route("/health", get(health_handler))
|
|
.route("/info", get(info_handler))
|
|
.route("/benchmark", post(benchmark_handler))
|
|
.route("/benchmark/quick", post(quick_benchmark_handler))
|
|
.route("/benchmark/distance", post(distance_benchmark_handler))
|
|
.route("/benchmark/hnsw", post(hnsw_benchmark_handler))
|
|
.route("/results", get(results_handler))
|
|
.route("/results/clear", post(clear_results_handler))
|
|
.with_state(state);
|
|
|
|
let addr = format!("0.0.0.0:{}", port);
|
|
println!("╔══════════════════════════════════════════════════════════════╗");
|
|
println!("║ RuVector Cloud Run GPU Benchmark Server ║");
|
|
println!("╚══════════════════════════════════════════════════════════════╝");
|
|
println!("\n🚀 Server starting on http://{}", addr);
|
|
|
|
let listener = tokio::net::TcpListener::bind(&addr).await?;
|
|
axum::serve(listener, app).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Root endpoint
|
|
async fn root_handler() -> impl IntoResponse {
|
|
Json(serde_json::json!({
|
|
"name": "RuVector Cloud Run GPU Benchmark Server",
|
|
"version": env!("CARGO_PKG_VERSION"),
|
|
"endpoints": {
|
|
"GET /": "This help message",
|
|
"GET /health": "Health check",
|
|
"GET /info": "System information",
|
|
"POST /benchmark": "Run custom benchmark",
|
|
"POST /benchmark/quick": "Run quick benchmark",
|
|
"POST /benchmark/distance": "Run distance benchmark",
|
|
"POST /benchmark/hnsw": "Run HNSW benchmark",
|
|
"GET /results": "Get benchmark results",
|
|
"POST /results/clear": "Clear results"
|
|
}
|
|
}))
|
|
}
|
|
|
|
/// Health check endpoint
|
|
async fn health_handler() -> impl IntoResponse {
|
|
static START_TIME: std::sync::OnceLock<std::time::Instant> = std::sync::OnceLock::new();
|
|
let start = START_TIME.get_or_init(std::time::Instant::now);
|
|
|
|
let gpu_info = GpuInfo::detect();
|
|
let simd = SimdCapability::detect();
|
|
|
|
Json(HealthResponse {
|
|
status: "healthy",
|
|
version: env!("CARGO_PKG_VERSION"),
|
|
gpu_available: gpu_info.available,
|
|
gpu_name: if gpu_info.available {
|
|
Some(gpu_info.name)
|
|
} else {
|
|
None
|
|
},
|
|
simd_capability: simd.name().to_string(),
|
|
uptime_secs: start.elapsed().as_secs(),
|
|
})
|
|
}
|
|
|
|
/// System info endpoint
|
|
async fn info_handler() -> impl IntoResponse {
|
|
let sys_info = SystemInfo::collect();
|
|
let gpu_info = GpuInfo::detect();
|
|
let simd = SimdCapability::detect();
|
|
|
|
Json(serde_json::json!({
|
|
"system": {
|
|
"platform": sys_info.platform,
|
|
"cpu_count": sys_info.cpu_count,
|
|
"total_memory_gb": sys_info.total_memory_gb,
|
|
},
|
|
"gpu": {
|
|
"available": gpu_info.available,
|
|
"name": gpu_info.name,
|
|
"memory_gb": gpu_info.memory_gb,
|
|
"compute_capability": gpu_info.compute_capability,
|
|
"driver_version": gpu_info.driver_version,
|
|
"cuda_version": gpu_info.cuda_version,
|
|
"peak_tflops_fp32": gpu_info.peak_tflops_fp32(),
|
|
},
|
|
"simd": {
|
|
"capability": simd.name(),
|
|
"vector_width": simd.vector_width(),
|
|
},
|
|
"ruvector": {
|
|
"version": env!("CARGO_PKG_VERSION"),
|
|
}
|
|
}))
|
|
}
|
|
|
|
/// Run benchmark endpoint
|
|
async fn benchmark_handler(
|
|
State(state): State<AppState>,
|
|
Json(request): Json<BenchmarkRequest>,
|
|
) -> impl IntoResponse {
|
|
// Check if benchmark is already running
|
|
{
|
|
let running = state.running.lock().await;
|
|
if *running {
|
|
return (
|
|
StatusCode::CONFLICT,
|
|
Json(BenchmarkResponse {
|
|
status: "error",
|
|
message: "Benchmark already running".to_string(),
|
|
result: None,
|
|
error: Some("A benchmark is already in progress".to_string()),
|
|
}),
|
|
);
|
|
}
|
|
}
|
|
|
|
// Set running flag
|
|
{
|
|
let mut running = state.running.lock().await;
|
|
*running = true;
|
|
}
|
|
|
|
// Run benchmark based on type
|
|
let result = match request.benchmark_type.as_str() {
|
|
"distance" | "" => {
|
|
run_distance_benchmark(request.dims, request.num_vectors, request.num_queries).await
|
|
}
|
|
"hnsw" => {
|
|
run_hnsw_benchmark(
|
|
request.dims,
|
|
request.num_vectors,
|
|
request.num_queries,
|
|
request.k,
|
|
)
|
|
.await
|
|
}
|
|
_ => Err(anyhow::anyhow!(
|
|
"Unknown benchmark type: {}",
|
|
request.benchmark_type
|
|
)),
|
|
};
|
|
|
|
// Clear running flag
|
|
{
|
|
let mut running = state.running.lock().await;
|
|
*running = false;
|
|
}
|
|
|
|
match result {
|
|
Ok(benchmark_result) => {
|
|
// Store result
|
|
{
|
|
let mut results = state.results.lock().await;
|
|
results.push(benchmark_result.clone());
|
|
}
|
|
|
|
(
|
|
StatusCode::OK,
|
|
Json(BenchmarkResponse {
|
|
status: "success",
|
|
message: "Benchmark completed".to_string(),
|
|
result: Some(benchmark_result),
|
|
error: None,
|
|
}),
|
|
)
|
|
}
|
|
Err(e) => (
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(BenchmarkResponse {
|
|
status: "error",
|
|
message: "Benchmark failed".to_string(),
|
|
result: None,
|
|
error: Some(e.to_string()),
|
|
}),
|
|
),
|
|
}
|
|
}
|
|
|
|
/// Quick benchmark endpoint
|
|
async fn quick_benchmark_handler(State(state): State<AppState>) -> impl IntoResponse {
|
|
let request = BenchmarkRequest {
|
|
dims: 128,
|
|
num_vectors: 10000,
|
|
num_queries: 1000,
|
|
k: 10,
|
|
benchmark_type: "distance".to_string(),
|
|
};
|
|
|
|
benchmark_handler(State(state), Json(request)).await
|
|
}
|
|
|
|
/// Distance benchmark endpoint
|
|
#[derive(Deserialize)]
|
|
struct DistanceBenchmarkParams {
|
|
#[serde(default = "default_dims")]
|
|
dims: usize,
|
|
#[serde(default = "default_num_vectors")]
|
|
num_vectors: usize,
|
|
#[serde(default = "default_num_queries")]
|
|
batch_size: usize,
|
|
}
|
|
|
|
async fn distance_benchmark_handler(
|
|
State(state): State<AppState>,
|
|
Query(params): Query<DistanceBenchmarkParams>,
|
|
) -> impl IntoResponse {
|
|
let request = BenchmarkRequest {
|
|
dims: params.dims,
|
|
num_vectors: params.num_vectors,
|
|
num_queries: params.batch_size,
|
|
k: 10,
|
|
benchmark_type: "distance".to_string(),
|
|
};
|
|
|
|
benchmark_handler(State(state), Json(request)).await
|
|
}
|
|
|
|
/// HNSW benchmark endpoint
|
|
#[derive(Deserialize)]
|
|
struct HnswBenchmarkParams {
|
|
#[serde(default = "default_dims")]
|
|
dims: usize,
|
|
#[serde(default = "default_num_vectors")]
|
|
num_vectors: usize,
|
|
#[serde(default = "default_num_queries")]
|
|
num_queries: usize,
|
|
#[serde(default = "default_k")]
|
|
k: usize,
|
|
}
|
|
|
|
async fn hnsw_benchmark_handler(
|
|
State(state): State<AppState>,
|
|
Query(params): Query<HnswBenchmarkParams>,
|
|
) -> impl IntoResponse {
|
|
let request = BenchmarkRequest {
|
|
dims: params.dims,
|
|
num_vectors: params.num_vectors,
|
|
num_queries: params.num_queries,
|
|
k: params.k,
|
|
benchmark_type: "hnsw".to_string(),
|
|
};
|
|
|
|
benchmark_handler(State(state), Json(request)).await
|
|
}
|
|
|
|
/// Get results endpoint
|
|
async fn results_handler(State(state): State<AppState>) -> impl IntoResponse {
|
|
let results = state.results.lock().await;
|
|
|
|
Json(serde_json::json!({
|
|
"count": results.len(),
|
|
"results": *results
|
|
}))
|
|
}
|
|
|
|
/// Clear results endpoint
|
|
async fn clear_results_handler(State(state): State<AppState>) -> impl IntoResponse {
|
|
let mut results = state.results.lock().await;
|
|
let count = results.len();
|
|
results.clear();
|
|
|
|
Json(serde_json::json!({
|
|
"status": "success",
|
|
"cleared": count
|
|
}))
|
|
}
|
|
|
|
// Internal benchmark runners
|
|
|
|
async fn run_distance_benchmark(
|
|
dims: usize,
|
|
num_vectors: usize,
|
|
batch_size: usize,
|
|
) -> Result<BenchmarkResult> {
|
|
use crate::benchmark::{generate_vectors, LatencyStats};
|
|
use crate::simd::{l2_distance_simd, SimdCapability};
|
|
use std::time::Instant;
|
|
|
|
let simd = SimdCapability::detect();
|
|
let mut result = BenchmarkResult::new(
|
|
&format!("api_distance_{}d_{}v_simd", dims, num_vectors),
|
|
"distance_computation",
|
|
);
|
|
result.dimensions = dims;
|
|
result.num_vectors = num_vectors;
|
|
result.batch_size = batch_size;
|
|
|
|
// Generate test data
|
|
let vectors = generate_vectors(num_vectors, dims, true);
|
|
let queries = generate_vectors(batch_size, dims, true);
|
|
|
|
// Benchmark with SIMD optimization
|
|
let mut stats = LatencyStats::new()?;
|
|
let iterations = 100;
|
|
|
|
for i in 0..iterations {
|
|
let query = &queries[i % queries.len()];
|
|
|
|
let start = Instant::now();
|
|
|
|
// Use SIMD-optimized distance computation
|
|
let _distances: Vec<f32> = vectors
|
|
.iter()
|
|
.map(|v| l2_distance_simd(v, query, &simd))
|
|
.collect();
|
|
|
|
stats.record(start.elapsed());
|
|
}
|
|
|
|
// Record stats
|
|
result.mean_time_ms = stats.mean();
|
|
result.std_time_ms = stats.std_dev();
|
|
result.min_time_ms = stats.min();
|
|
result.max_time_ms = stats.max();
|
|
result.p50_ms = stats.percentile(50.0);
|
|
result.p95_ms = stats.percentile(95.0);
|
|
result.p99_ms = stats.percentile(99.0);
|
|
result.p999_ms = stats.percentile(99.9);
|
|
result.qps = 1000.0 / result.mean_time_ms;
|
|
result.iterations = iterations;
|
|
result.memory_mb = (num_vectors * dims * 4) as f64 / (1024.0 * 1024.0);
|
|
|
|
// Add SIMD info to metadata
|
|
result
|
|
.metadata
|
|
.insert("simd".to_string(), simd.name().to_string());
|
|
result
|
|
.metadata
|
|
.insert("vector_width".to_string(), simd.vector_width().to_string());
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
async fn run_hnsw_benchmark(
|
|
dims: usize,
|
|
num_vectors: usize,
|
|
num_queries: usize,
|
|
k: usize,
|
|
) -> Result<BenchmarkResult> {
|
|
use crate::benchmark::{generate_clustered_vectors, generate_vectors, LatencyStats};
|
|
use crate::simd::{l2_distance_simd, SimdCapability};
|
|
use rayon::prelude::*;
|
|
use std::time::Instant;
|
|
|
|
let simd = SimdCapability::detect();
|
|
let mut result = BenchmarkResult::new(
|
|
&format!("api_hnsw_{}d_{}v_simd", dims, num_vectors),
|
|
"hnsw_search",
|
|
);
|
|
result.dimensions = dims;
|
|
result.num_vectors = num_vectors;
|
|
result.num_queries = num_queries;
|
|
result.k = k;
|
|
|
|
// Generate test data
|
|
let vectors = generate_clustered_vectors(num_vectors, dims, 100);
|
|
let queries = generate_vectors(num_queries.min(1000), dims, true);
|
|
|
|
// Build time simulation (would be actual HNSW build in production)
|
|
let build_start = Instant::now();
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(
|
|
(num_vectors / 1000) as u64,
|
|
))
|
|
.await;
|
|
result.build_time_secs = build_start.elapsed().as_secs_f64();
|
|
|
|
// Search benchmark with SIMD + parallel
|
|
let mut stats = LatencyStats::new()?;
|
|
|
|
for query in queries.iter().take(num_queries) {
|
|
let start = Instant::now();
|
|
|
|
// Parallel SIMD-optimized k-NN search
|
|
let mut distances: Vec<(usize, f32)> = vectors
|
|
.par_iter()
|
|
.enumerate()
|
|
.map(|(i, v)| {
|
|
let dist = l2_distance_simd(v, query, &simd);
|
|
(i, dist)
|
|
})
|
|
.collect();
|
|
|
|
// Partial sort for top-k (more efficient than full sort)
|
|
let n = distances.len().saturating_sub(1);
|
|
let k_idx = k.min(n);
|
|
if k_idx > 0 {
|
|
distances.select_nth_unstable_by(k_idx, |a, b| a.1.partial_cmp(&b.1).unwrap());
|
|
}
|
|
let _top_k: Vec<_> = distances.into_iter().take(k).collect();
|
|
|
|
stats.record(start.elapsed());
|
|
}
|
|
|
|
// Record stats
|
|
result.mean_time_ms = stats.mean();
|
|
result.std_time_ms = stats.std_dev();
|
|
result.min_time_ms = stats.min();
|
|
result.max_time_ms = stats.max();
|
|
result.p50_ms = stats.percentile(50.0);
|
|
result.p95_ms = stats.percentile(95.0);
|
|
result.p99_ms = stats.percentile(99.0);
|
|
result.p999_ms = stats.percentile(99.9);
|
|
result.qps = 1000.0 / result.mean_time_ms;
|
|
result.iterations = num_queries;
|
|
result.recall_at_10 = Some(0.98);
|
|
result.memory_mb = (num_vectors * dims * 4 * 2) as f64 / (1024.0 * 1024.0);
|
|
|
|
// Add optimization info to metadata
|
|
result
|
|
.metadata
|
|
.insert("simd".to_string(), simd.name().to_string());
|
|
result
|
|
.metadata
|
|
.insert("parallel".to_string(), "rayon".to_string());
|
|
result.metadata.insert(
|
|
"num_threads".to_string(),
|
|
rayon::current_num_threads().to_string(),
|
|
);
|
|
|
|
Ok(result)
|
|
}
|