feat: in-flight query coalescing with COALESCED path #20

Merged
razvandimescu merged 5 commits from feat/query-coalescing into main 2026-03-29 15:36:02 +08:00
5 changed files with 363 additions and 3 deletions
Showing only changes of commit 5869f29f9f - Show all commits

View File

@@ -285,6 +285,7 @@ body {
.path-tag.OVERRIDE { background: rgba(82, 122, 82, 0.12); color: var(--emerald); }
.path-tag.SERVFAIL { background: rgba(181, 68, 58, 0.12); color: var(--rose); }
.path-tag.BLOCKED { background: rgba(163, 152, 136, 0.15); color: var(--text-dim); }
.path-tag.COALESCED { background: rgba(138, 104, 158, 0.12); color: var(--violet-dim); }
/* Sidebar panels */
.sidebar {
@@ -547,6 +548,8 @@ body {
<select id="logFilterPath" onchange="applyLogFilter()"
style="font-family:var(--font-mono);font-size:0.7rem;padding:0.25rem 0.4rem;border:1px solid var(--border);border-radius:4px;background:var(--bg-surface);color:var(--text-secondary);outline:none;">
<option value="">all paths</option>
<option value="RECURSIVE">recursive</option>
<option value="COALESCED">coalesced</option>
<option value="FORWARD">forward</option>
<option value="CACHED">cached</option>
<option value="BLOCKED">blocked</option>

View File

@@ -182,6 +182,7 @@ struct QueriesStats {
total: u64,
forwarded: u64,
recursive: u64,
coalesced: u64,
cached: u64,
local: u64,
overridden: u64,
@@ -499,6 +500,7 @@ async fn stats(State(ctx): State<Arc<ServerCtx>>) -> Json<StatsResponse> {
total: snap.total,
forwarded: snap.forwarded,
recursive: snap.recursive,
coalesced: snap.coalesced,
cached: snap.cached,
local: snap.local,
overridden: snap.overridden,

View File

@@ -201,7 +201,7 @@ pub async fn handle_query(
match rx.recv().await {
Ok(Some(mut resp)) => {
resp.header.id = query.header.id;
(resp, QueryPath::Recursive, DnssecStatus::Indeterminate)
(resp, QueryPath::Coalesced, DnssecStatus::Indeterminate)
}
_ => (
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
@@ -442,6 +442,20 @@ impl Drop for InflightGuard<'_> {
}
}
/// Build a wire-format DNS query packet for the given domain and type.
#[cfg(test)]
fn build_wire_query(id: u16, domain: &str, qtype: QueryType) -> BytePacketBuffer {
let mut pkt = DnsPacket::new();
pkt.header.id = id;
pkt.header.recursion_desired = true;
pkt.header.questions = 1;
pkt.questions
.push(crate::question::DnsQuestion::new(domain.to_string(), qtype));
let mut buf = BytePacketBuffer::new();
pkt.write(&mut buf).unwrap();
BytePacketBuffer::from_bytes(buf.filled())
}
fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> DnsPacket {
use std::net::{Ipv4Addr, Ipv6Addr};
if qname == "ipv4only.arpa" {
@@ -475,3 +489,334 @@ fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> Dns
DnsPacket::response_from(query, ResultCode::NXDOMAIN)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex, RwLock};
use tokio::sync::broadcast;
// ---- InflightGuard unit tests ----
#[test]
fn inflight_guard_removes_key_on_drop() {
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
let key = ("example.com".to_string(), QueryType::A);
let (tx, _) = broadcast::channel::<Option<DnsPacket>>(1);
map.lock().unwrap().insert(key.clone(), tx);
assert_eq!(map.lock().unwrap().len(), 1);
{
let _guard = InflightGuard {
inflight: &map,
key: key.clone(),
};
} // guard dropped here
assert!(map.lock().unwrap().is_empty());
}
#[test]
fn inflight_guard_only_removes_own_key() {
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
let key_a = ("a.com".to_string(), QueryType::A);
let key_b = ("b.com".to_string(), QueryType::A);
let (tx_a, _) = broadcast::channel::<Option<DnsPacket>>(1);
let (tx_b, _) = broadcast::channel::<Option<DnsPacket>>(1);
map.lock().unwrap().insert(key_a.clone(), tx_a);
map.lock().unwrap().insert(key_b.clone(), tx_b);
{
let _guard = InflightGuard {
inflight: &map,
key: key_a,
};
}
let m = map.lock().unwrap();
assert_eq!(m.len(), 1);
assert!(m.contains_key(&key_b));
}
#[test]
fn inflight_guard_same_domain_different_qtype_independent() {
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
let key_a = ("example.com".to_string(), QueryType::A);
let key_aaaa = ("example.com".to_string(), QueryType::AAAA);
let (tx_a, _) = broadcast::channel::<Option<DnsPacket>>(1);
let (tx_aaaa, _) = broadcast::channel::<Option<DnsPacket>>(1);
map.lock().unwrap().insert(key_a.clone(), tx_a);
map.lock().unwrap().insert(key_aaaa.clone(), tx_aaaa);
{
let _guard = InflightGuard {
inflight: &map,
key: key_a,
};
}
let m = map.lock().unwrap();
assert_eq!(m.len(), 1);
assert!(m.contains_key(&key_aaaa));
}
// ---- Coalescing disposition tests ----
#[test]
fn leader_follower_disposition() {
// First caller becomes leader, second becomes follower
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
let key = ("test.com".to_string(), QueryType::A);
// First: no entry → insert and become leader
let is_leader = {
let mut m = map.lock().unwrap();
if m.get(&key).is_some() {
false
} else {
let (tx, _) = broadcast::channel::<Option<DnsPacket>>(1);
m.insert(key.clone(), tx);
true
}
};
assert!(is_leader);
// Second: entry exists → become follower
let is_follower = {
let m = map.lock().unwrap();
m.get(&key).is_some()
};
assert!(is_follower);
}
#[tokio::test]
async fn broadcast_delivers_result_to_follower() {
let (tx, _) = broadcast::channel::<Option<DnsPacket>>(1);
let mut rx = tx.subscribe();
let mut resp = DnsPacket::new();
resp.header.id = 42;
resp.answers.push(DnsRecord::A {
domain: "test.com".into(),
addr: Ipv4Addr::new(1, 2, 3, 4),
ttl: 300,
});
let _ = tx.send(Some(resp));
let received = rx.recv().await.unwrap().unwrap();
assert_eq!(received.header.id, 42);
assert_eq!(received.answers.len(), 1);
}
#[tokio::test]
async fn broadcast_none_signals_failure() {
let (tx, _) = broadcast::channel::<Option<DnsPacket>>(1);
let mut rx = tx.subscribe();
let _ = tx.send(None);
let received = rx.recv().await.unwrap();
assert!(received.is_none());
}
#[tokio::test]
async fn multiple_followers_all_receive_result() {
let (tx, _) = broadcast::channel::<Option<DnsPacket>>(1);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
let mut rx3 = tx.subscribe();
let mut resp = DnsPacket::new();
resp.answers.push(DnsRecord::A {
domain: "multi.com".into(),
addr: Ipv4Addr::new(10, 0, 0, 1),
ttl: 60,
});
let _ = tx.send(Some(resp));
for rx in [&mut rx1, &mut rx2, &mut rx3] {
let r = rx.recv().await.unwrap().unwrap();
assert_eq!(r.answers.len(), 1);
}
}
// ---- Integration: concurrent handle_query coalescing ----
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
/// Spawn a slow TCP DNS server that delays `delay` before responding.
/// Returns (addr, query_count) where query_count is an Arc<AtomicU32>
/// tracking how many queries were actually resolved (not coalesced).
async fn spawn_slow_dns_server(
delay: Duration,
) -> (SocketAddr, Arc<std::sync::atomic::AtomicU32>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let count_clone = count.clone();
tokio::spawn(async move {
loop {
let (mut stream, _) = match listener.accept().await {
Ok(c) => c,
Err(_) => break,
};
let count = count_clone.clone();
let delay = delay;
tokio::spawn(async move {
let mut len_buf = [0u8; 2];
if stream.read_exact(&mut len_buf).await.is_err() {
return;
}
let len = u16::from_be_bytes(len_buf) as usize;
let mut data = vec![0u8; len];
if stream.read_exact(&mut data).await.is_err() {
return;
}
let mut buf = BytePacketBuffer::from_bytes(&data);
let query = match DnsPacket::from_buffer(&mut buf) {
Ok(q) => q,
Err(_) => return,
};
count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Deliberate delay to create coalescing window
tokio::time::sleep(delay).await;
let mut resp = DnsPacket::response_from(&query, ResultCode::NOERROR);
resp.header.authoritative_answer = true;
if let Some(q) = query.questions.first() {
resp.answers.push(DnsRecord::A {
domain: q.name.clone(),
addr: Ipv4Addr::new(10, 0, 0, 1),
ttl: 300,
});
}
let mut resp_buf = BytePacketBuffer::new();
if resp.write(&mut resp_buf).is_err() {
return;
}
let resp_bytes = resp_buf.filled();
let mut out = Vec::with_capacity(2 + resp_bytes.len());
out.extend_from_slice(&(resp_bytes.len() as u16).to_be_bytes());
out.extend_from_slice(resp_bytes);
let _ = stream.write_all(&out).await;
});
}
});
(addr, count)
}
async fn test_recursive_ctx(root_hint: SocketAddr) -> Arc<ServerCtx> {
let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
Arc::new(ServerCtx {
socket,
zone_map: HashMap::new(),
cache: RwLock::new(crate::cache::DnsCache::new(100, 60, 86400)),
stats: Mutex::new(crate::stats::ServerStats::new()),
overrides: RwLock::new(crate::override_store::OverrideStore::new()),
blocklist: RwLock::new(crate::blocklist::BlocklistStore::new()),
query_log: Mutex::new(crate::query_log::QueryLog::new(100)),
services: Mutex::new(crate::service_store::ServiceStore::new()),
lan_peers: Mutex::new(crate::lan::PeerStore::new(90)),
forwarding_rules: Vec::new(),
upstream: Mutex::new(crate::forward::Upstream::Udp(
"127.0.0.1:53".parse().unwrap(),
)),
upstream_auto: false,
upstream_port: 53,
lan_ip: Mutex::new(Ipv4Addr::LOCALHOST),
timeout: Duration::from_secs(3),
proxy_tld: "numa".to_string(),
proxy_tld_suffix: ".numa".to_string(),
lan_enabled: false,
config_path: "/tmp/test-numa.toml".to_string(),
config_found: false,
config_dir: std::path::PathBuf::from("/tmp"),
data_dir: std::path::PathBuf::from("/tmp"),
tls_config: None,
upstream_mode: crate::config::UpstreamMode::Recursive,
root_hints: vec![root_hint],
srtt: RwLock::new(crate::srtt::SrttCache::new(true)),
inflight: Mutex::new(HashMap::new()),
dnssec_enabled: false,
dnssec_strict: false,
})
}
#[tokio::test]
async fn concurrent_queries_coalesce_to_single_resolution() {
// Force TCP-only so mock server works
crate::recursive::UDP_DISABLED.store(true, std::sync::atomic::Ordering::Release);
let (server_addr, query_count) = spawn_slow_dns_server(Duration::from_millis(200)).await;
let ctx = test_recursive_ctx(server_addr).await;
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap();
// Fire 5 concurrent queries for the same (domain, A)
let mut handles = Vec::new();
for i in 0..5u16 {
let ctx = ctx.clone();
let buf = build_wire_query(100 + i, "coalesce-test.example.com", QueryType::A);
handles.push(tokio::spawn(async move {
handle_query(buf, src, &ctx).await
}));
}
for h in handles {
h.await.unwrap().unwrap();
}
// Only 1 resolution should have reached the upstream server
let actual = query_count.load(std::sync::atomic::Ordering::Relaxed);
assert_eq!(actual, 1, "expected 1 upstream query, got {}", actual);
// Inflight map must be empty after all queries complete
assert!(ctx.inflight.lock().unwrap().is_empty());
crate::recursive::reset_udp_state();
}
#[tokio::test]
async fn different_qtypes_not_coalesced() {
crate::recursive::UDP_DISABLED.store(true, std::sync::atomic::Ordering::Release);
let (server_addr, query_count) = spawn_slow_dns_server(Duration::from_millis(100)).await;
let ctx = test_recursive_ctx(server_addr).await;
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap();
// Fire A and AAAA concurrently — should NOT coalesce
let ctx_ref = ctx.clone();
let ctx_ref2 = ctx.clone();
let buf_a = build_wire_query(200, "different-qt.example.com", QueryType::A);
let buf_aaaa = build_wire_query(201, "different-qt.example.com", QueryType::AAAA);
let h1 = tokio::spawn(async move { handle_query(buf_a, src, &ctx_ref).await });
let h2 = tokio::spawn(async move { handle_query(buf_aaaa, src, &ctx_ref2).await });
h1.await.unwrap().unwrap();
h2.await.unwrap().unwrap();
let actual = query_count.load(std::sync::atomic::Ordering::Relaxed);
assert!(actual >= 2, "A and AAAA should resolve independently, got {}", actual);
assert!(ctx.inflight.lock().unwrap().is_empty());
crate::recursive::reset_udp_state();
}
#[tokio::test]
async fn inflight_map_cleaned_after_upstream_error() {
// Server that rejects everything — no server running at all
let bogus_addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let ctx = test_recursive_ctx(bogus_addr).await;
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap();
let buf = build_wire_query(300, "will-fail.example.com", QueryType::A);
let _ = handle_query(buf, src, &ctx).await;
// Map must be clean even after error
assert!(ctx.inflight.lock().unwrap().is_empty());
}
}

View File

@@ -21,7 +21,7 @@ const UDP_FAIL_THRESHOLD: u8 = 3;
static QUERY_ID: AtomicU16 = AtomicU16::new(1);
static UDP_FAILURES: std::sync::atomic::AtomicU8 = std::sync::atomic::AtomicU8::new(0);
static UDP_DISABLED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
pub(crate) static UDP_DISABLED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
fn next_id() -> u16 {
QUERY_ID.fetch_add(1, Ordering::Relaxed)

View File

@@ -4,6 +4,7 @@ pub struct ServerStats {
queries_total: u64,
queries_forwarded: u64,
queries_recursive: u64,
queries_coalesced: u64,
queries_cached: u64,
queries_blocked: u64,
queries_local: u64,
@@ -18,6 +19,7 @@ pub enum QueryPath {
Cached,
Forwarded,
Recursive,
Coalesced,
Blocked,
Overridden,
UpstreamError,
@@ -30,6 +32,7 @@ impl QueryPath {
QueryPath::Cached => "CACHED",
QueryPath::Forwarded => "FORWARD",
QueryPath::Recursive => "RECURSIVE",
QueryPath::Coalesced => "COALESCED",
QueryPath::Blocked => "BLOCKED",
QueryPath::Overridden => "OVERRIDE",
QueryPath::UpstreamError => "SERVFAIL",
@@ -45,6 +48,8 @@ impl QueryPath {
Some(QueryPath::Forwarded)
} else if s.eq_ignore_ascii_case("RECURSIVE") {
Some(QueryPath::Recursive)
} else if s.eq_ignore_ascii_case("COALESCED") {
Some(QueryPath::Coalesced)
} else if s.eq_ignore_ascii_case("BLOCKED") {
Some(QueryPath::Blocked)
} else if s.eq_ignore_ascii_case("OVERRIDE") {
@@ -69,6 +74,7 @@ impl ServerStats {
queries_total: 0,
queries_forwarded: 0,
queries_recursive: 0,
queries_coalesced: 0,
queries_cached: 0,
queries_blocked: 0,
queries_local: 0,
@@ -85,6 +91,7 @@ impl ServerStats {
QueryPath::Cached => self.queries_cached += 1,
QueryPath::Forwarded => self.queries_forwarded += 1,
QueryPath::Recursive => self.queries_recursive += 1,
QueryPath::Coalesced => self.queries_coalesced += 1,
QueryPath::Blocked => self.queries_blocked += 1,
QueryPath::Overridden => self.queries_overridden += 1,
QueryPath::UpstreamError => self.upstream_errors += 1,
@@ -106,6 +113,7 @@ impl ServerStats {
total: self.queries_total,
forwarded: self.queries_forwarded,
recursive: self.queries_recursive,
coalesced: self.queries_coalesced,
cached: self.queries_cached,
local: self.queries_local,
overridden: self.queries_overridden,
@@ -121,11 +129,12 @@ impl ServerStats {
let secs = uptime.as_secs() % 60;
log::info!(
"STATS | uptime {}h{}m{}s | total {} | fwd {} | recursive {} | cached {} | local {} | override {} | blocked {} | errors {}",
"STATS | uptime {}h{}m{}s | total {} | fwd {} | recursive {} | coalesced {} | cached {} | local {} | override {} | blocked {} | errors {}",
hours, mins, secs,
self.queries_total,
self.queries_forwarded,
self.queries_recursive,
self.queries_coalesced,
self.queries_cached,
self.queries_local,
self.queries_overridden,
@@ -140,6 +149,7 @@ pub struct StatsSnapshot {
pub total: u64,
pub forwarded: u64,
pub recursive: u64,
pub coalesced: u64,
pub cached: u64,
pub local: u64,
pub overridden: u64,