398 lines
11 KiB
Rust
398 lines
11 KiB
Rust
//! Auto-Sharding Tests
|
|
//!
|
|
//! Tests for:
|
|
//! - Consistent hashing for shard distribution
|
|
//! - Dynamic shard rebalancing
|
|
//! - Cross-shard queries
|
|
//! - Load balancing
|
|
|
|
use ruvector_cluster::{
|
|
ConsistentHashRing, ShardRouter, ClusterManager, ClusterConfig,
|
|
ClusterNode, ShardInfo, ShardStatus, NodeStatus,
|
|
discovery::StaticDiscovery,
|
|
shard::{ShardMigration, LoadBalancer, LoadStats},
|
|
};
|
|
use std::collections::HashMap;
|
|
use std::net::{SocketAddr, IpAddr, Ipv4Addr};
|
|
use std::time::{Duration, Instant};
|
|
|
|
/// Test consistent hash ring creation
|
|
#[tokio::test]
|
|
async fn test_consistent_hash_ring_creation() {
|
|
let ring = ConsistentHashRing::new(3);
|
|
|
|
assert_eq!(ring.node_count(), 0);
|
|
assert!(ring.list_nodes().is_empty());
|
|
}
|
|
|
|
/// Test adding nodes to hash ring
|
|
#[tokio::test]
|
|
async fn test_hash_ring_node_addition() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
|
|
ring.add_node("node1".to_string());
|
|
ring.add_node("node2".to_string());
|
|
ring.add_node("node3".to_string());
|
|
|
|
assert_eq!(ring.node_count(), 3);
|
|
|
|
let nodes = ring.list_nodes();
|
|
assert!(nodes.contains(&"node1".to_string()));
|
|
assert!(nodes.contains(&"node2".to_string()));
|
|
assert!(nodes.contains(&"node3".to_string()));
|
|
}
|
|
|
|
/// Test node removal from hash ring
|
|
#[tokio::test]
|
|
async fn test_hash_ring_node_removal() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
|
|
ring.add_node("node1".to_string());
|
|
ring.add_node("node2".to_string());
|
|
ring.add_node("node3".to_string());
|
|
|
|
assert_eq!(ring.node_count(), 3);
|
|
|
|
ring.remove_node("node2");
|
|
|
|
assert_eq!(ring.node_count(), 2);
|
|
assert!(!ring.list_nodes().contains(&"node2".to_string()));
|
|
}
|
|
|
|
/// Test key distribution across nodes
|
|
#[tokio::test]
|
|
async fn test_key_distribution() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
|
|
ring.add_node("node1".to_string());
|
|
ring.add_node("node2".to_string());
|
|
ring.add_node("node3".to_string());
|
|
|
|
let mut distribution: HashMap<String, usize> = HashMap::new();
|
|
|
|
// Test distribution across many keys
|
|
for i in 0..10000 {
|
|
let key = format!("key-{}", i);
|
|
if let Some(node) = ring.get_primary_node(&key) {
|
|
*distribution.entry(node).or_insert(0) += 1;
|
|
}
|
|
}
|
|
|
|
println!("Key distribution across nodes:");
|
|
for (node, count) in &distribution {
|
|
let percentage = (*count as f64 / 10000.0) * 100.0;
|
|
println!(" {}: {} ({:.1}%)", node, count, percentage);
|
|
}
|
|
|
|
// Each node should get roughly 1/3 (within reasonable tolerance)
|
|
for count in distribution.values() {
|
|
let ratio = *count as f64 / 10000.0;
|
|
assert!(ratio > 0.2 && ratio < 0.5, "Uneven distribution: {:.3}", ratio);
|
|
}
|
|
}
|
|
|
|
/// Test replication factor compliance
|
|
#[tokio::test]
|
|
async fn test_replication_factor() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
|
|
ring.add_node("node1".to_string());
|
|
ring.add_node("node2".to_string());
|
|
ring.add_node("node3".to_string());
|
|
ring.add_node("node4".to_string());
|
|
ring.add_node("node5".to_string());
|
|
|
|
// Request 3 nodes for replication
|
|
let nodes = ring.get_nodes("test-key", 3);
|
|
|
|
assert_eq!(nodes.len(), 3);
|
|
|
|
// All nodes should be unique
|
|
let unique: std::collections::HashSet<_> = nodes.iter().collect();
|
|
assert_eq!(unique.len(), 3);
|
|
}
|
|
|
|
/// Test shard router creation
|
|
#[tokio::test]
|
|
async fn test_shard_router() {
|
|
let router = ShardRouter::new(64);
|
|
|
|
let shard1 = router.get_shard("key1");
|
|
let shard2 = router.get_shard("key2");
|
|
|
|
assert!(shard1 < 64);
|
|
assert!(shard2 < 64);
|
|
|
|
// Same key should always map to same shard
|
|
let shard1_again = router.get_shard("key1");
|
|
assert_eq!(shard1, shard1_again);
|
|
}
|
|
|
|
/// Test jump consistent hash distribution
|
|
#[tokio::test]
|
|
async fn test_jump_consistent_hash() {
|
|
let router = ShardRouter::new(16);
|
|
|
|
let mut distribution: HashMap<u32, usize> = HashMap::new();
|
|
|
|
for i in 0..10000 {
|
|
let key = format!("test-key-{}", i);
|
|
let shard = router.get_shard(&key);
|
|
*distribution.entry(shard).or_insert(0) += 1;
|
|
}
|
|
|
|
println!("Shard distribution:");
|
|
let mut total = 0;
|
|
for shard in 0..16 {
|
|
let count = distribution.get(&shard).copied().unwrap_or(0);
|
|
total += count;
|
|
println!(" Shard {}: {}", shard, count);
|
|
}
|
|
|
|
assert_eq!(total, 10000);
|
|
|
|
// Check for reasonably even distribution
|
|
let expected = 10000 / 16;
|
|
for count in distribution.values() {
|
|
let deviation = (*count as i32 - expected as i32).abs() as f64 / expected as f64;
|
|
assert!(deviation < 0.5, "Shard distribution too uneven");
|
|
}
|
|
}
|
|
|
|
/// Test shard router caching
|
|
#[tokio::test]
|
|
async fn test_shard_router_caching() {
|
|
let router = ShardRouter::new(64);
|
|
|
|
// First access
|
|
let _ = router.get_shard("cached-key");
|
|
|
|
let stats = router.cache_stats();
|
|
assert_eq!(stats.entries, 1);
|
|
|
|
// Second access (should hit cache)
|
|
let _ = router.get_shard("cached-key");
|
|
|
|
// Add more keys
|
|
for i in 0..100 {
|
|
router.get_shard(&format!("key-{}", i));
|
|
}
|
|
|
|
let stats = router.cache_stats();
|
|
assert_eq!(stats.entries, 101); // 1 original + 100 new
|
|
}
|
|
|
|
/// Test cache clearing
|
|
#[tokio::test]
|
|
async fn test_cache_clearing() {
|
|
let router = ShardRouter::new(32);
|
|
|
|
for i in 0..50 {
|
|
router.get_shard(&format!("key-{}", i));
|
|
}
|
|
|
|
assert_eq!(router.cache_stats().entries, 50);
|
|
|
|
router.clear_cache();
|
|
|
|
assert_eq!(router.cache_stats().entries, 0);
|
|
}
|
|
|
|
/// Test cluster manager creation
|
|
#[tokio::test]
|
|
async fn test_cluster_manager_creation() {
|
|
let config = ClusterConfig::default();
|
|
let discovery = Box::new(StaticDiscovery::new(vec![]));
|
|
|
|
let manager = ClusterManager::new(config, "test-node".to_string(), discovery);
|
|
|
|
assert!(manager.is_ok());
|
|
}
|
|
|
|
/// Test cluster node management
|
|
#[tokio::test]
|
|
async fn test_cluster_node_management() {
|
|
let config = ClusterConfig {
|
|
shard_count: 8,
|
|
replication_factor: 2,
|
|
..Default::default()
|
|
};
|
|
let discovery = Box::new(StaticDiscovery::new(vec![]));
|
|
let manager = ClusterManager::new(config, "coordinator".to_string(), discovery).unwrap();
|
|
|
|
// Add nodes
|
|
for i in 0..3 {
|
|
let node = ClusterNode::new(
|
|
format!("node{}", i),
|
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000 + i as u16),
|
|
);
|
|
manager.add_node(node).await.unwrap();
|
|
}
|
|
|
|
assert_eq!(manager.list_nodes().len(), 3);
|
|
|
|
// Remove a node
|
|
manager.remove_node("node1").await.unwrap();
|
|
assert_eq!(manager.list_nodes().len(), 2);
|
|
}
|
|
|
|
/// Test shard assignment
|
|
#[tokio::test]
|
|
async fn test_shard_assignment() {
|
|
let config = ClusterConfig {
|
|
shard_count: 4,
|
|
replication_factor: 2,
|
|
..Default::default()
|
|
};
|
|
let discovery = Box::new(StaticDiscovery::new(vec![]));
|
|
let manager = ClusterManager::new(config, "coordinator".to_string(), discovery).unwrap();
|
|
|
|
// Add nodes
|
|
for i in 0..3 {
|
|
let node = ClusterNode::new(
|
|
format!("node{}", i),
|
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000 + i as u16),
|
|
);
|
|
manager.add_node(node).await.unwrap();
|
|
}
|
|
|
|
// Assign a shard
|
|
let shard = manager.assign_shard(0).unwrap();
|
|
|
|
assert_eq!(shard.shard_id, 0);
|
|
assert!(!shard.primary_node.is_empty());
|
|
assert_eq!(shard.status, ShardStatus::Active);
|
|
}
|
|
|
|
/// Test shard migration
|
|
#[tokio::test]
|
|
async fn test_shard_migration() {
|
|
let mut migration = ShardMigration::new(0, 1, 1000);
|
|
|
|
assert!(!migration.is_complete());
|
|
assert_eq!(migration.progress, 0.0);
|
|
|
|
// Simulate partial migration
|
|
migration.update_progress(500);
|
|
assert_eq!(migration.progress, 0.5);
|
|
assert!(!migration.is_complete());
|
|
|
|
// Complete migration
|
|
migration.update_progress(1000);
|
|
assert_eq!(migration.progress, 1.0);
|
|
assert!(migration.is_complete());
|
|
}
|
|
|
|
/// Test load balancer
|
|
#[tokio::test]
|
|
async fn test_load_balancer() {
|
|
let balancer = LoadBalancer::new();
|
|
|
|
// Update loads for shards
|
|
balancer.update_load(0, 0.3);
|
|
balancer.update_load(1, 0.8);
|
|
balancer.update_load(2, 0.5);
|
|
balancer.update_load(3, 0.2);
|
|
|
|
// Get loads
|
|
assert_eq!(balancer.get_load(0), 0.3);
|
|
assert_eq!(balancer.get_load(1), 0.8);
|
|
|
|
// Get least loaded shard
|
|
let least_loaded = balancer.get_least_loaded_shard(&[0, 1, 2, 3]);
|
|
assert_eq!(least_loaded, Some(3));
|
|
|
|
// Get statistics
|
|
let stats = balancer.get_stats();
|
|
assert_eq!(stats.shard_count, 4);
|
|
assert!(stats.avg_load > 0.0);
|
|
assert!(stats.max_load == 0.8);
|
|
assert!(stats.min_load == 0.2);
|
|
}
|
|
|
|
/// Test cluster statistics
|
|
#[tokio::test]
|
|
async fn test_cluster_statistics() {
|
|
let config = ClusterConfig {
|
|
shard_count: 4,
|
|
replication_factor: 2,
|
|
..Default::default()
|
|
};
|
|
let discovery = Box::new(StaticDiscovery::new(vec![]));
|
|
let manager = ClusterManager::new(config, "coordinator".to_string(), discovery).unwrap();
|
|
|
|
// Add nodes
|
|
for i in 0..3 {
|
|
let node = ClusterNode::new(
|
|
format!("node{}", i),
|
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000 + i as u16),
|
|
);
|
|
manager.add_node(node).await.unwrap();
|
|
}
|
|
|
|
let stats = manager.get_stats();
|
|
|
|
assert_eq!(stats.total_nodes, 3);
|
|
assert_eq!(stats.healthy_nodes, 3);
|
|
}
|
|
|
|
/// Performance test for shard routing
|
|
#[tokio::test]
|
|
async fn test_shard_routing_performance() {
|
|
let router = ShardRouter::new(256);
|
|
|
|
let start = Instant::now();
|
|
let iterations = 100000;
|
|
|
|
for i in 0..iterations {
|
|
let key = format!("perf-key-{}", i);
|
|
let _ = router.get_shard(&key);
|
|
}
|
|
|
|
let elapsed = start.elapsed();
|
|
let ops_per_sec = iterations as f64 / elapsed.as_secs_f64();
|
|
|
|
println!("Shard routing: {:.0} ops/sec", ops_per_sec);
|
|
|
|
// Should be able to route millions of keys per second
|
|
assert!(ops_per_sec > 100000.0, "Routing too slow: {:.0} ops/sec", ops_per_sec);
|
|
}
|
|
|
|
/// Test key stability after node addition
|
|
#[tokio::test]
|
|
async fn test_key_stability_on_node_addition() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
|
|
ring.add_node("node1".to_string());
|
|
ring.add_node("node2".to_string());
|
|
ring.add_node("node3".to_string());
|
|
|
|
// Record initial assignments
|
|
let mut initial_assignments: HashMap<String, String> = HashMap::new();
|
|
for i in 0..1000 {
|
|
let key = format!("stable-key-{}", i);
|
|
if let Some(node) = ring.get_primary_node(&key) {
|
|
initial_assignments.insert(key, node);
|
|
}
|
|
}
|
|
|
|
// Add a new node
|
|
ring.add_node("node4".to_string());
|
|
|
|
// Check how many keys changed
|
|
let mut changes = 0;
|
|
for (key, original_node) in &initial_assignments {
|
|
if let Some(new_node) = ring.get_primary_node(key) {
|
|
if new_node != *original_node {
|
|
changes += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
let change_ratio = changes as f64 / initial_assignments.len() as f64;
|
|
println!("Keys reassigned after adding node: {} ({:.1}%)", changes, change_ratio * 100.0);
|
|
|
|
// With consistent hashing, only ~25% of keys should move (1/4 for 3->4 nodes)
|
|
assert!(change_ratio < 0.4, "Too many keys reassigned: {:.1}%", change_ratio * 100.0);
|
|
}
|