refactor(ctx): coalesce forward-path upstream queries

resolve_coalesced now takes leader_path: QueryPath and applies to all
three upstream branches (Forwarded-rule, Recursive, Upstream), not just
Recursive. Fixes thundering-herd at boot when N concurrent HTTPS setups
each trigger independent forward queries for the same upstream hostname.
This commit is contained in:
Razvan Dimescu
2026-04-21 16:18:52 +03:00
parent 60600b045f
commit 31adc31c9b

View File

@@ -209,45 +209,41 @@ pub async fn resolve_query(
{ {
// Conditional forwarding takes priority over recursive mode // Conditional forwarding takes priority over recursive mode
// (e.g. Tailscale .ts.net, VPC private zones) // (e.g. Tailscale .ts.net, VPC private zones)
upstream_transport = pool.preferred().map(|u| u.transport()); let key = (qname.clone(), qtype);
match forward_with_failover_raw( let (resp, path, err) = resolve_coalesced(
&ctx.inflight,
key,
&query,
QueryPath::Forwarded,
|| async {
let wire = forward_with_failover_raw(
raw_wire, raw_wire,
pool, pool,
&ctx.srtt, &ctx.srtt,
ctx.timeout, ctx.timeout,
ctx.hedge_delay, ctx.hedge_delay,
) )
.await .await?;
{ cache_and_parse(ctx, &qname, qtype, &wire)
Ok(resp_wire) => match cache_and_parse(ctx, &qname, qtype, &resp_wire) {
Ok(resp) => (resp, QueryPath::Forwarded, DnssecStatus::Indeterminate),
Err(e) => {
error!("{} | {:?} {} | PARSE ERROR | {}", src_addr, qtype, qname, e);
(
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
DnssecStatus::Indeterminate,
)
}
}, },
Err(e) => {
error!(
"{} | {:?} {} | FORWARD ERROR | {}",
src_addr, qtype, qname, e
);
(
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
DnssecStatus::Indeterminate,
) )
.await;
log_coalesced_outcome(src_addr, qtype, &qname, path, err.as_deref(), "FORWARD");
if path == QueryPath::Forwarded {
upstream_transport = pool.preferred().map(|u| u.transport());
} }
} (resp, path, DnssecStatus::Indeterminate)
} else if ctx.upstream_mode == UpstreamMode::Recursive { } else if ctx.upstream_mode == UpstreamMode::Recursive {
// Recursive resolution makes UDP hops to roots/TLDs/auths; // Recursive resolution makes UDP hops to roots/TLDs/auths;
// tag as Udp so the dashboard can aggregate plaintext-wire // tag as Udp so the dashboard can aggregate plaintext-wire
// egress honestly. Only mark on success — errors stay None. // egress honestly. Only mark on success — errors stay None.
let key = (qname.clone(), qtype); let key = (qname.clone(), qtype);
let (resp, path, err) = resolve_coalesced(&ctx.inflight, key, &query, || { let (resp, path, err) = resolve_coalesced(
&ctx.inflight,
key,
&query,
QueryPath::Recursive,
|| {
crate::recursive::resolve_recursive( crate::recursive::resolve_recursive(
&qname, &qname,
qtype, qtype,
@@ -256,59 +252,40 @@ pub async fn resolve_query(
&ctx.root_hints, &ctx.root_hints,
&ctx.srtt, &ctx.srtt,
) )
}) },
)
.await; .await;
if path == QueryPath::Coalesced { log_coalesced_outcome(src_addr, qtype, &qname, path, err.as_deref(), "RECURSIVE");
debug!("{} | {:?} {} | COALESCED", src_addr, qtype, qname); if path == QueryPath::Recursive {
} else if path == QueryPath::UpstreamError {
error!(
"{} | {:?} {} | RECURSIVE ERROR | {}",
src_addr,
qtype,
qname,
err.as_deref().unwrap_or("leader failed")
);
} else {
upstream_transport = Some(crate::stats::UpstreamTransport::Udp); upstream_transport = Some(crate::stats::UpstreamTransport::Udp);
} }
(resp, path, DnssecStatus::Indeterminate) (resp, path, DnssecStatus::Indeterminate)
} else { } else {
let pool = ctx.upstream_pool.lock().unwrap().clone(); let pool = ctx.upstream_pool.lock().unwrap().clone();
match forward_with_failover_raw( let key = (qname.clone(), qtype);
let (resp, path, err) = resolve_coalesced(
&ctx.inflight,
key,
&query,
QueryPath::Upstream,
|| async {
let wire = forward_with_failover_raw(
raw_wire, raw_wire,
&pool, &pool,
&ctx.srtt, &ctx.srtt,
ctx.timeout, ctx.timeout,
ctx.hedge_delay, ctx.hedge_delay,
) )
.await .await?;
{ cache_and_parse(ctx, &qname, qtype, &wire)
Ok(resp_wire) => match cache_and_parse(ctx, &qname, qtype, &resp_wire) {
Ok(resp) => {
upstream_transport = pool.preferred().map(|u| u.transport());
(resp, QueryPath::Upstream, DnssecStatus::Indeterminate)
}
Err(e) => {
error!("{} | {:?} {} | PARSE ERROR | {}", src_addr, qtype, qname, e);
(
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
DnssecStatus::Indeterminate,
)
}
}, },
Err(e) => {
error!(
"{} | {:?} {} | UPSTREAM ERROR | {}",
src_addr, qtype, qname, e
);
(
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
DnssecStatus::Indeterminate,
) )
.await;
log_coalesced_outcome(src_addr, qtype, &qname, path, err.as_deref(), "UPSTREAM");
if path == QueryPath::Upstream {
upstream_transport = pool.preferred().map(|u| u.transport());
} }
} (resp, path, DnssecStatus::Indeterminate)
} }
} }
}; };
@@ -611,11 +588,15 @@ fn acquire_inflight(inflight: &Mutex<InflightMap>, key: (String, QueryType)) ->
/// Run a resolve function with in-flight coalescing. Multiple concurrent calls /// Run a resolve function with in-flight coalescing. Multiple concurrent calls
/// for the same key share a single resolution — the first caller (leader) /// for the same key share a single resolution — the first caller (leader)
/// executes `resolve_fn`, and followers wait for the broadcast result. /// executes `resolve_fn`, and followers wait for the broadcast result. The
/// leader's successful path is tagged with `leader_path` so callers that
/// share this helper (recursive, forwarded-rule, forward-upstream) keep their
/// own observability without duplicating the inflight map.
async fn resolve_coalesced<F, Fut>( async fn resolve_coalesced<F, Fut>(
inflight: &Mutex<InflightMap>, inflight: &Mutex<InflightMap>,
key: (String, QueryType), key: (String, QueryType),
query: &DnsPacket, query: &DnsPacket,
leader_path: QueryPath,
resolve_fn: F, resolve_fn: F,
) -> (DnsPacket, QueryPath, Option<String>) ) -> (DnsPacket, QueryPath, Option<String>)
where where
@@ -644,7 +625,7 @@ where
match result { match result {
Ok(resp) => { Ok(resp) => {
let _ = tx.send(Some(resp.clone())); let _ = tx.send(Some(resp.clone()));
(resp, QueryPath::Recursive, None) (resp, leader_path, None)
} }
Err(e) => { Err(e) => {
let _ = tx.send(None); let _ = tx.send(None);
@@ -671,6 +652,33 @@ impl Drop for InflightGuard<'_> {
} }
} }
/// Emit the log lines shared by the three upstream branches (Forwarded,
/// Recursive, Upstream) after `resolve_coalesced` returns. Leader-success
/// and transport-tagging stay at the call site since they diverge per
/// branch, but the Coalesced debug and UpstreamError error are identical
/// except for the label.
fn log_coalesced_outcome(
src_addr: SocketAddr,
qtype: QueryType,
qname: &str,
path: QueryPath,
err: Option<&str>,
label: &str,
) {
match path {
QueryPath::Coalesced => debug!("{} | {:?} {} | COALESCED", src_addr, qtype, qname),
QueryPath::UpstreamError => error!(
"{} | {:?} {} | {} ERROR | {}",
src_addr,
qtype,
qname,
label,
err.unwrap_or("leader failed")
),
_ => {}
}
}
fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> DnsPacket { fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> DnsPacket {
use std::net::{Ipv4Addr, Ipv6Addr}; use std::net::{Ipv4Addr, Ipv6Addr};
if qname == "ipv4only.arpa" { if qname == "ipv4only.arpa" {
@@ -909,7 +917,7 @@ mod tests {
let key = ("coalesce.test".to_string(), QueryType::A); let key = ("coalesce.test".to_string(), QueryType::A);
let query = DnsPacket::query(100 + i, "coalesce.test", QueryType::A); let query = DnsPacket::query(100 + i, "coalesce.test", QueryType::A);
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
resolve_coalesced(&inf, key, &query, || async { resolve_coalesced(&inf, key, &query, QueryPath::Recursive, || async {
count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(200)).await; tokio::time::sleep(Duration::from_millis(200)).await;
Ok(mock_response("coalesce.test")) Ok(mock_response("coalesce.test"))
@@ -953,6 +961,7 @@ mod tests {
&inf1, &inf1,
("same.domain".to_string(), QueryType::A), ("same.domain".to_string(), QueryType::A),
&query_a, &query_a,
QueryPath::Recursive,
|| async { || async {
count1.fetch_add(1, std::sync::atomic::Ordering::Relaxed); count1.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
@@ -966,6 +975,7 @@ mod tests {
&inf2, &inf2,
("same.domain".to_string(), QueryType::AAAA), ("same.domain".to_string(), QueryType::AAAA),
&query_aaaa, &query_aaaa,
QueryPath::Recursive,
|| async { || async {
count2.fetch_add(1, std::sync::atomic::Ordering::Relaxed); count2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
@@ -995,6 +1005,7 @@ mod tests {
&inflight, &inflight,
("will-fail.test".to_string(), QueryType::A), ("will-fail.test".to_string(), QueryType::A),
&query, &query,
QueryPath::Recursive,
|| async { Err::<DnsPacket, _>("upstream timeout".into()) }, || async { Err::<DnsPacket, _>("upstream timeout".into()) },
) )
.await; .await;
@@ -1016,6 +1027,7 @@ mod tests {
&inf, &inf,
("fail.test".to_string(), QueryType::A), ("fail.test".to_string(), QueryType::A),
&query, &query,
QueryPath::Recursive,
|| async { || async {
tokio::time::sleep(Duration::from_millis(200)).await; tokio::time::sleep(Duration::from_millis(200)).await;
Err::<DnsPacket, _>("upstream error".into()) Err::<DnsPacket, _>("upstream error".into())
@@ -1056,6 +1068,7 @@ mod tests {
&inflight, &inflight,
("question.test".to_string(), QueryType::A), ("question.test".to_string(), QueryType::A),
&query, &query,
QueryPath::Recursive,
|| async { Err::<DnsPacket, _>("fail".into()) }, || async { Err::<DnsPacket, _>("fail".into()) },
) )
.await; .await;
@@ -1080,6 +1093,7 @@ mod tests {
&inflight, &inflight,
("err-msg.test".to_string(), QueryType::A), ("err-msg.test".to_string(), QueryType::A),
&query, &query,
QueryPath::Recursive,
|| async { Err::<DnsPacket, _>("connection refused by upstream".into()) }, || async { Err::<DnsPacket, _>("connection refused by upstream".into()) },
) )
.await; .await;