From 31adc31c9b64b38ff0ce7b3847d21afd7c96fbc2 Mon Sep 17 00:00:00 2001 From: Razvan Dimescu Date: Tue, 21 Apr 2026 16:18:52 +0300 Subject: [PATCH] refactor(ctx): coalesce forward-path upstream queries resolve_coalesced now takes leader_path: QueryPath and applies to all three upstream branches (Forwarded-rule, Recursive, Upstream), not just Recursive. Fixes thundering-herd at boot when N concurrent HTTPS setups each trigger independent forward queries for the same upstream hostname. --- src/ctx.rs | 190 ++++++++++++++++++++++++++++------------------------- 1 file changed, 102 insertions(+), 88 deletions(-) diff --git a/src/ctx.rs b/src/ctx.rs index 511b678..a0c15ac 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -209,106 +209,83 @@ pub async fn resolve_query( { // Conditional forwarding takes priority over recursive mode // (e.g. Tailscale .ts.net, VPC private zones) - upstream_transport = pool.preferred().map(|u| u.transport()); - match forward_with_failover_raw( - raw_wire, - pool, - &ctx.srtt, - ctx.timeout, - ctx.hedge_delay, - ) - .await - { - Ok(resp_wire) => match cache_and_parse(ctx, &qname, qtype, &resp_wire) { - Ok(resp) => (resp, QueryPath::Forwarded, DnssecStatus::Indeterminate), - Err(e) => { - error!("{} | {:?} {} | PARSE ERROR | {}", src_addr, qtype, qname, e); - ( - DnsPacket::response_from(&query, ResultCode::SERVFAIL), - QueryPath::UpstreamError, - DnssecStatus::Indeterminate, - ) - } - }, - Err(e) => { - error!( - "{} | {:?} {} | FORWARD ERROR | {}", - src_addr, qtype, qname, e - ); - ( - DnsPacket::response_from(&query, ResultCode::SERVFAIL), - QueryPath::UpstreamError, - DnssecStatus::Indeterminate, + let key = (qname.clone(), qtype); + let (resp, path, err) = resolve_coalesced( + &ctx.inflight, + key, + &query, + QueryPath::Forwarded, + || async { + let wire = forward_with_failover_raw( + raw_wire, + pool, + &ctx.srtt, + ctx.timeout, + ctx.hedge_delay, ) - } + .await?; + cache_and_parse(ctx, &qname, qtype, &wire) + }, + ) + .await; + log_coalesced_outcome(src_addr, qtype, &qname, path, err.as_deref(), "FORWARD"); + if path == QueryPath::Forwarded { + upstream_transport = pool.preferred().map(|u| u.transport()); } + (resp, path, DnssecStatus::Indeterminate) } else if ctx.upstream_mode == UpstreamMode::Recursive { // Recursive resolution makes UDP hops to roots/TLDs/auths; // tag as Udp so the dashboard can aggregate plaintext-wire // egress honestly. Only mark on success — errors stay None. let key = (qname.clone(), qtype); - let (resp, path, err) = resolve_coalesced(&ctx.inflight, key, &query, || { - crate::recursive::resolve_recursive( - &qname, - qtype, - &ctx.cache, - &query, - &ctx.root_hints, - &ctx.srtt, - ) - }) + let (resp, path, err) = resolve_coalesced( + &ctx.inflight, + key, + &query, + QueryPath::Recursive, + || { + crate::recursive::resolve_recursive( + &qname, + qtype, + &ctx.cache, + &query, + &ctx.root_hints, + &ctx.srtt, + ) + }, + ) .await; - if path == QueryPath::Coalesced { - debug!("{} | {:?} {} | COALESCED", src_addr, qtype, qname); - } else if path == QueryPath::UpstreamError { - error!( - "{} | {:?} {} | RECURSIVE ERROR | {}", - src_addr, - qtype, - qname, - err.as_deref().unwrap_or("leader failed") - ); - } else { + log_coalesced_outcome(src_addr, qtype, &qname, path, err.as_deref(), "RECURSIVE"); + if path == QueryPath::Recursive { upstream_transport = Some(crate::stats::UpstreamTransport::Udp); } (resp, path, DnssecStatus::Indeterminate) } else { let pool = ctx.upstream_pool.lock().unwrap().clone(); - match forward_with_failover_raw( - raw_wire, - &pool, - &ctx.srtt, - ctx.timeout, - ctx.hedge_delay, - ) - .await - { - Ok(resp_wire) => match cache_and_parse(ctx, &qname, qtype, &resp_wire) { - Ok(resp) => { - upstream_transport = pool.preferred().map(|u| u.transport()); - (resp, QueryPath::Upstream, DnssecStatus::Indeterminate) - } - Err(e) => { - error!("{} | {:?} {} | PARSE ERROR | {}", src_addr, qtype, qname, e); - ( - DnsPacket::response_from(&query, ResultCode::SERVFAIL), - QueryPath::UpstreamError, - DnssecStatus::Indeterminate, - ) - } - }, - Err(e) => { - error!( - "{} | {:?} {} | UPSTREAM ERROR | {}", - src_addr, qtype, qname, e - ); - ( - DnsPacket::response_from(&query, ResultCode::SERVFAIL), - QueryPath::UpstreamError, - DnssecStatus::Indeterminate, + let key = (qname.clone(), qtype); + let (resp, path, err) = resolve_coalesced( + &ctx.inflight, + key, + &query, + QueryPath::Upstream, + || async { + let wire = forward_with_failover_raw( + raw_wire, + &pool, + &ctx.srtt, + ctx.timeout, + ctx.hedge_delay, ) - } + .await?; + cache_and_parse(ctx, &qname, qtype, &wire) + }, + ) + .await; + log_coalesced_outcome(src_addr, qtype, &qname, path, err.as_deref(), "UPSTREAM"); + if path == QueryPath::Upstream { + upstream_transport = pool.preferred().map(|u| u.transport()); } + (resp, path, DnssecStatus::Indeterminate) } } }; @@ -611,11 +588,15 @@ fn acquire_inflight(inflight: &Mutex, key: (String, QueryType)) -> /// Run a resolve function with in-flight coalescing. Multiple concurrent calls /// for the same key share a single resolution — the first caller (leader) -/// executes `resolve_fn`, and followers wait for the broadcast result. +/// executes `resolve_fn`, and followers wait for the broadcast result. The +/// leader's successful path is tagged with `leader_path` so callers that +/// share this helper (recursive, forwarded-rule, forward-upstream) keep their +/// own observability without duplicating the inflight map. async fn resolve_coalesced( inflight: &Mutex, key: (String, QueryType), query: &DnsPacket, + leader_path: QueryPath, resolve_fn: F, ) -> (DnsPacket, QueryPath, Option) where @@ -644,7 +625,7 @@ where match result { Ok(resp) => { let _ = tx.send(Some(resp.clone())); - (resp, QueryPath::Recursive, None) + (resp, leader_path, None) } Err(e) => { let _ = tx.send(None); @@ -671,6 +652,33 @@ impl Drop for InflightGuard<'_> { } } +/// Emit the log lines shared by the three upstream branches (Forwarded, +/// Recursive, Upstream) after `resolve_coalesced` returns. Leader-success +/// and transport-tagging stay at the call site since they diverge per +/// branch, but the Coalesced debug and UpstreamError error are identical +/// except for the label. +fn log_coalesced_outcome( + src_addr: SocketAddr, + qtype: QueryType, + qname: &str, + path: QueryPath, + err: Option<&str>, + label: &str, +) { + match path { + QueryPath::Coalesced => debug!("{} | {:?} {} | COALESCED", src_addr, qtype, qname), + QueryPath::UpstreamError => error!( + "{} | {:?} {} | {} ERROR | {}", + src_addr, + qtype, + qname, + label, + err.unwrap_or("leader failed") + ), + _ => {} + } +} + fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> DnsPacket { use std::net::{Ipv4Addr, Ipv6Addr}; if qname == "ipv4only.arpa" { @@ -909,7 +917,7 @@ mod tests { let key = ("coalesce.test".to_string(), QueryType::A); let query = DnsPacket::query(100 + i, "coalesce.test", QueryType::A); handles.push(tokio::spawn(async move { - resolve_coalesced(&inf, key, &query, || async { + resolve_coalesced(&inf, key, &query, QueryPath::Recursive, || async { count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); tokio::time::sleep(Duration::from_millis(200)).await; Ok(mock_response("coalesce.test")) @@ -953,6 +961,7 @@ mod tests { &inf1, ("same.domain".to_string(), QueryType::A), &query_a, + QueryPath::Recursive, || async { count1.fetch_add(1, std::sync::atomic::Ordering::Relaxed); tokio::time::sleep(Duration::from_millis(100)).await; @@ -966,6 +975,7 @@ mod tests { &inf2, ("same.domain".to_string(), QueryType::AAAA), &query_aaaa, + QueryPath::Recursive, || async { count2.fetch_add(1, std::sync::atomic::Ordering::Relaxed); tokio::time::sleep(Duration::from_millis(100)).await; @@ -995,6 +1005,7 @@ mod tests { &inflight, ("will-fail.test".to_string(), QueryType::A), &query, + QueryPath::Recursive, || async { Err::("upstream timeout".into()) }, ) .await; @@ -1016,6 +1027,7 @@ mod tests { &inf, ("fail.test".to_string(), QueryType::A), &query, + QueryPath::Recursive, || async { tokio::time::sleep(Duration::from_millis(200)).await; Err::("upstream error".into()) @@ -1056,6 +1068,7 @@ mod tests { &inflight, ("question.test".to_string(), QueryType::A), &query, + QueryPath::Recursive, || async { Err::("fail".into()) }, ) .await; @@ -1080,6 +1093,7 @@ mod tests { &inflight, ("err-msg.test".to_string(), QueryType::A), &query, + QueryPath::Recursive, || async { Err::("connection refused by upstream".into()) }, ) .await;