From b5ef76dd65d18d5002dba1b727869ba8da89f4c7 Mon Sep 17 00:00:00 2001 From: Razvan Dimescu Date: Sun, 29 Mar 2026 10:25:11 +0300 Subject: [PATCH] style: cargo fmt Co-Authored-By: Claude Opus 4.6 --- src/ctx.rs | 12 ++++-- src/recursive.rs | 3 +- src/srtt.rs | 100 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 5 deletions(-) diff --git a/src/ctx.rs b/src/ctx.rs index d0193b6..2f8bda0 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -760,9 +760,9 @@ mod tests { for i in 0..5u16 { let ctx = ctx.clone(); let buf = build_wire_query(100 + i, "coalesce-test.example.com", QueryType::A); - handles.push(tokio::spawn(async move { - handle_query(buf, src, &ctx).await - })); + handles.push(tokio::spawn( + async move { handle_query(buf, src, &ctx).await }, + )); } for h in handles { @@ -800,7 +800,11 @@ mod tests { h2.await.unwrap().unwrap(); let actual = query_count.load(std::sync::atomic::Ordering::Relaxed); - assert!(actual >= 2, "A and AAAA should resolve independently, got {}", actual); + assert!( + actual >= 2, + "A and AAAA should resolve independently, got {}", + actual + ); assert!(ctx.inflight.lock().unwrap().is_empty()); crate::recursive::reset_udp_state(); diff --git a/src/recursive.rs b/src/recursive.rs index 7df70eb..54a9625 100644 --- a/src/recursive.rs +++ b/src/recursive.rs @@ -21,7 +21,8 @@ const UDP_FAIL_THRESHOLD: u8 = 3; static QUERY_ID: AtomicU16 = AtomicU16::new(1); static UDP_FAILURES: std::sync::atomic::AtomicU8 = std::sync::atomic::AtomicU8::new(0); -pub(crate) static UDP_DISABLED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); +pub(crate) static UDP_DISABLED: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(false); fn next_id() -> u16 { QUERY_ID.fetch_add(1, Ordering::Relaxed) diff --git a/src/srtt.rs b/src/srtt.rs index 3fe4694..e44efbb 100644 --- a/src/srtt.rs +++ b/src/srtt.rs @@ -108,6 +108,13 @@ impl SrttCache { self.entries.is_empty() } + #[cfg(test)] + fn set_updated_at(&mut self, ip: IpAddr, at: Instant) { + if let Some(entry) = self.entries.get_mut(&ip) { + entry.updated_at = at; + } + } + fn maybe_evict(&mut self) { if self.entries.len() < MAX_ENTRIES { return; @@ -203,6 +210,99 @@ mod tests { assert_eq!(addrs, original); } + fn age(secs: u64) -> Instant { + Instant::now() - std::time::Duration::from_secs(secs) + } + + /// Cache with ip(1) saturated at FAILURE_PENALTY_MS + fn saturated_penalty_cache() -> SrttCache { + let mut cache = SrttCache::new(true); + for _ in 0..30 { + cache.record_rtt(ip(1), FAILURE_PENALTY_MS, false); + } + cache + } + + #[test] + fn no_decay_within_threshold() { + let mut cache = SrttCache::new(true); + cache.record_rtt(ip(1), 5000, false); + cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS)); + assert_eq!(cache.get(ip(1)), cache.entries[&ip(1)].srtt_ms); + } + + #[test] + fn one_decay_period() { + let mut cache = saturated_penalty_cache(); + let raw = cache.entries[&ip(1)].srtt_ms; + cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS + 1)); + let expected = (raw + INITIAL_SRTT_MS) / 2; + assert_eq!(cache.get(ip(1)), expected); + } + + #[test] + fn multiple_decay_periods() { + let mut cache = saturated_penalty_cache(); + let raw = cache.entries[&ip(1)].srtt_ms; + cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 4 + 1)); + let mut expected = raw; + for _ in 0..4 { + expected = (expected + INITIAL_SRTT_MS) / 2; + } + assert_eq!(cache.get(ip(1)), expected); + } + + #[test] + fn decay_caps_at_8_periods() { + // 9 periods and 100 periods should produce the same result (capped at 8) + let mut cache_a = saturated_penalty_cache(); + let mut cache_b = saturated_penalty_cache(); + cache_a.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 9 + 1)); + cache_b.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 100)); + assert_eq!(cache_a.get(ip(1)), cache_b.get(ip(1))); + } + + #[test] + fn decay_converges_toward_initial() { + let mut cache = saturated_penalty_cache(); + cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 100)); + let decayed = cache.get(ip(1)); + let diff = decayed.abs_diff(INITIAL_SRTT_MS); + assert!( + diff < 25, + "expected near INITIAL_SRTT_MS, got {} (diff={})", + decayed, + diff + ); + } + + #[test] + fn record_rtt_applies_decay_before_ewma() { + let mut cache = saturated_penalty_cache(); + cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 8)); + cache.record_rtt(ip(1), 50, false); + let srtt = cache.get(ip(1)); + // Without decay-before-EWMA, result would be ~(5000*7+50)/8 ≈ 4381 + assert!(srtt < 500, "expected decay before EWMA, got srtt={}", srtt); + } + + #[test] + fn decay_reranks_stale_failures() { + let mut cache = saturated_penalty_cache(); + for _ in 0..30 { + cache.record_rtt(ip(2), 300, false); + } + let mut addrs = vec![sock(1), sock(2)]; + cache.sort_by_rtt(&mut addrs); + assert_eq!(addrs, vec![sock(2), sock(1)]); + + // Age server 1 so it decays toward INITIAL (200ms) — below server 2's 300ms + cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 100)); + let mut addrs = vec![sock(1), sock(2)]; + cache.sort_by_rtt(&mut addrs); + assert_eq!(addrs, vec![sock(1), sock(2)]); + } + #[test] fn eviction_removes_oldest() { let mut cache = SrttCache::new(true);