feat: DoT client, recursive optimization, bench refactor

- Add DoT forwarding client (tls://IP#hostname upstream config)
- Recursive: cache NS delegations, serve-stale (RFC 8767), parallel
  NS queries on cold, no TCP fallback on individual UDP timeouts,
  400ms NS/TCP timeout (down from 800/1500ms)
- Reduce recursive p99 from 2367ms to 402ms (vs Unbound's 148ms)
- Refactor benchmark suite: generic compare_two engine, delete
  one-off diagnostics (1969 → 750 lines)
- Code cleanup: forward_query delegates to _raw, Option<String>
  for tls_name, saturating_sub for ns_idx
This commit is contained in:
Razvan Dimescu
2026-04-12 06:22:42 +03:00
parent 7efac85836
commit 5d9a3a809b
5 changed files with 754 additions and 1400 deletions

2
Cargo.lock generated
View File

@@ -1358,7 +1358,7 @@ dependencies = [
"tokio-rustls", "tokio-rustls",
"toml", "toml",
"tower", "tower",
"webpki-roots", "webpki-roots 1.0.6",
] ]
[[package]] [[package]]

View File

@@ -5,7 +5,8 @@ api_bind_addr = "127.0.0.1"
data_dir = "/tmp/numa-bench" data_dir = "/tmp/numa-bench"
[upstream] [upstream]
mode = "recursive" mode = "forward"
address = ["https://9.9.9.9/dns-query"]
timeout_ms = 10000 timeout_ms = 10000
[cache] [cache]
@@ -15,8 +16,13 @@ max_ttl = 3600
[blocking] [blocking]
enabled = false enabled = false
[proxy]
port = 8080
tls_port = 8443
[dot] [dot]
enabled = false enabled = true
port = 8530
[mobile] [mobile]
enabled = false enabled = false

File diff suppressed because it is too large Load Diff

View File

@@ -214,15 +214,11 @@ pub async fn forward_query(
upstream: &Upstream, upstream: &Upstream,
timeout_duration: Duration, timeout_duration: Duration,
) -> Result<DnsPacket> { ) -> Result<DnsPacket> {
match upstream { let mut send_buffer = BytePacketBuffer::new();
Upstream::Udp(addr) => forward_udp(query, *addr, timeout_duration).await, query.write(&mut send_buffer)?;
Upstream::Doh { url, client } => forward_doh(query, url, client, timeout_duration).await, let data = forward_query_raw(send_buffer.filled(), upstream, timeout_duration).await?;
Upstream::Dot { let mut recv_buffer = BytePacketBuffer::from_bytes(&data);
addr, DnsPacket::from_buffer(&mut recv_buffer)
tls_name,
connector,
} => forward_dot(query, *addr, tls_name, connector, timeout_duration).await,
}
} }
pub(crate) async fn forward_udp( pub(crate) async fn forward_udp(
@@ -284,13 +280,13 @@ pub(crate) async fn forward_tcp(
DnsPacket::from_buffer(&mut recv_buffer) DnsPacket::from_buffer(&mut recv_buffer)
} }
async fn forward_dot( async fn forward_dot_raw(
query: &DnsPacket, wire: &[u8],
addr: SocketAddr, addr: SocketAddr,
tls_name: &Option<String>, tls_name: &Option<String>,
connector: &tokio_rustls::TlsConnector, connector: &tokio_rustls::TlsConnector,
timeout_duration: Duration, timeout_duration: Duration,
) -> Result<DnsPacket> { ) -> Result<Vec<u8>> {
use rustls::pki_types::ServerName; use rustls::pki_types::ServerName;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
@@ -303,10 +299,6 @@ async fn forward_dot(
let tcp = timeout(timeout_duration, TcpStream::connect(addr)).await??; let tcp = timeout(timeout_duration, TcpStream::connect(addr)).await??;
let mut tls = timeout(timeout_duration, connector.connect(server_name, tcp)).await??; let mut tls = timeout(timeout_duration, connector.connect(server_name, tcp)).await??;
let mut send_buffer = BytePacketBuffer::new();
query.write(&mut send_buffer)?;
let wire = send_buffer.filled();
let mut outbuf = Vec::with_capacity(2 + wire.len()); let mut outbuf = Vec::with_capacity(2 + wire.len());
outbuf.extend_from_slice(&(wire.len() as u16).to_be_bytes()); outbuf.extend_from_slice(&(wire.len() as u16).to_be_bytes());
outbuf.extend_from_slice(wire); outbuf.extend_from_slice(wire);
@@ -319,22 +311,7 @@ async fn forward_dot(
let mut data = vec![0u8; resp_len]; let mut data = vec![0u8; resp_len];
timeout(timeout_duration, tls.read_exact(&mut data)).await??; timeout(timeout_duration, tls.read_exact(&mut data)).await??;
let mut recv_buffer = BytePacketBuffer::from_bytes(&data); Ok(data)
DnsPacket::from_buffer(&mut recv_buffer)
}
async fn forward_doh(
query: &DnsPacket,
url: &str,
client: &reqwest::Client,
timeout_duration: Duration,
) -> Result<DnsPacket> {
let mut send_buffer = BytePacketBuffer::new();
query.write(&mut send_buffer)?;
let resp_bytes = forward_doh_raw(send_buffer.filled(), url, client, timeout_duration).await?;
let mut recv_buffer = BytePacketBuffer::from_bytes(&resp_bytes);
DnsPacket::from_buffer(&mut recv_buffer)
} }
pub async fn forward_query_raw( pub async fn forward_query_raw(
@@ -345,6 +322,11 @@ pub async fn forward_query_raw(
match upstream { match upstream {
Upstream::Udp(addr) => forward_udp_raw(wire, *addr, timeout_duration).await, Upstream::Udp(addr) => forward_udp_raw(wire, *addr, timeout_duration).await,
Upstream::Doh { url, client } => forward_doh_raw(wire, url, client, timeout_duration).await, Upstream::Doh { url, client } => forward_doh_raw(wire, url, client, timeout_duration).await,
Upstream::Dot {
addr,
tls_name,
connector,
} => forward_dot_raw(wire, *addr, tls_name, connector, timeout_duration).await,
} }
} }
@@ -405,7 +387,10 @@ pub async fn forward_with_hedging_raw(
match (primary_err, secondary_err) { match (primary_err, secondary_err) {
(Some(pe), Some(_)) => return Err(pe), (Some(pe), Some(_)) => return Err(pe),
(pe, se) => { primary_err = pe; secondary_err = se; } (pe, se) => {
primary_err = pe;
secondary_err = se;
}
} }
} }
} }

View File

@@ -15,8 +15,8 @@ use crate::srtt::SrttCache;
const MAX_REFERRAL_DEPTH: u8 = 10; const MAX_REFERRAL_DEPTH: u8 = 10;
const MAX_CNAME_DEPTH: u8 = 8; const MAX_CNAME_DEPTH: u8 = 8;
const NS_QUERY_TIMEOUT: Duration = Duration::from_millis(800); const NS_QUERY_TIMEOUT: Duration = Duration::from_millis(400);
const TCP_TIMEOUT: Duration = Duration::from_millis(1500); const TCP_TIMEOUT: Duration = Duration::from_millis(400);
const UDP_FAIL_THRESHOLD: u8 = 3; const UDP_FAIL_THRESHOLD: u8 = 3;
static QUERY_ID: AtomicU16 = AtomicU16::new(1); static QUERY_ID: AtomicU16 = AtomicU16::new(1);
@@ -213,11 +213,13 @@ pub(crate) fn resolve_iterative<'a>(
ns_addrs[ns_idx], q_type, q_name, current_zone, referral_depth ns_addrs[ns_idx], q_type, q_name, current_zone, referral_depth
); );
let response = match send_query_hedged(q_name, q_type, &ns_addrs[ns_idx..], srtt).await { let response = match send_query_hedged(q_name, q_type, &ns_addrs[ns_idx..], srtt).await
{
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
debug!("recursive: NS query failed: {}", e); debug!("recursive: NS query failed: {}", e);
ns_idx += 2; // both tried, skip past them let remaining = ns_addrs.len().saturating_sub(ns_idx);
ns_idx += remaining.min(2);
continue; continue;
} }
}; };
@@ -660,7 +662,10 @@ async fn send_query_hedged(
} }
match (a_err.take(), b_err.take()) { match (a_err.take(), b_err.take()) {
(Some(e), Some(_)) => return Err(e), (Some(e), Some(_)) => return Err(e),
(a, b) => { a_err = a; b_err = b; } (a, b) => {
a_err = a;
b_err = b;
}
} }
} }
} else { } else {
@@ -739,9 +744,13 @@ async fn send_query(
"send_query: {} consecutive UDP failures — switching to TCP-first", "send_query: {} consecutive UDP failures — switching to TCP-first",
fails fails
); );
// Now that UDP is disabled, retry this query via TCP
return tcp_with_srtt(&query, server, srtt, start).await;
} }
debug!("send_query: UDP failed for {}: {}, trying TCP", server, e); // UDP works in general (priming succeeded) but this server timed out.
tcp_with_srtt(&query, server, srtt, start).await // Don't waste another 400ms on TCP — the server is unreachable.
srtt.write().unwrap().record_failure(server.ip());
Err(e)
} }
} }
} }
@@ -1021,10 +1030,10 @@ mod tests {
} }
/// TCP-only server returns authoritative answer directly. /// TCP-only server returns authoritative answer directly.
/// Verifies: UDP fails → TCP fallback → resolves. /// Verifies: when UDP is disabled, TCP-first resolves.
#[tokio::test] #[tokio::test]
async fn tcp_fallback_resolves_when_udp_blocked() { async fn tcp_fallback_resolves_when_udp_blocked() {
UDP_DISABLED.store(false, Ordering::Relaxed); UDP_DISABLED.store(true, Ordering::Relaxed);
UDP_FAILURES.store(0, Ordering::Release); UDP_FAILURES.store(0, Ordering::Release);
let server_addr = spawn_tcp_dns_server(|query| { let server_addr = spawn_tcp_dns_server(|query| {
@@ -1107,7 +1116,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn tcp_fallback_handles_nxdomain() { async fn tcp_fallback_handles_nxdomain() {
UDP_DISABLED.store(false, Ordering::Relaxed); UDP_DISABLED.store(true, Ordering::Relaxed);
UDP_FAILURES.store(0, Ordering::Release); UDP_FAILURES.store(0, Ordering::Release);
let server_addr = spawn_tcp_dns_server(|query| { let server_addr = spawn_tcp_dns_server(|query| {