3 Commits

Author SHA1 Message Date
Razvan Dimescu
bea0affdde chore: bump version to 0.7.2 2026-03-29 11:44:10 +03:00
Razvan Dimescu
bad4f25d7d docs: streamline README for clarity and scannability
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 11:42:08 +03:00
Razvan Dimescu
5f45e23f55 refactor: extract resolve_coalesced, test real code (#21)
* refactor: extract resolve_coalesced, rewrite tests against real code

Extract Disposition enum, acquire_inflight(), and resolve_coalesced()
from handle_query so coalescing logic is independently testable. Rewrite
integration tests to call resolve_coalesced directly with mock futures
instead of fighting the iterative resolver's NS chain. All 12 coalescing
tests now exercise production code paths, not tokio primitives.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: SERVFAIL echoes question section, preserve error messages

resolve_coalesced now takes &DnsPacket instead of query_id so SERVFAIL
responses use response_from (echoing question section per RFC). Error
messages preserved via Option<String> return for upstream error logging.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 11:14:25 +03:00
5 changed files with 309 additions and 369 deletions

2
Cargo.lock generated
View File

@@ -1143,7 +1143,7 @@ dependencies = [
[[package]] [[package]]
name = "numa" name = "numa"
version = "0.7.1" version = "0.7.2"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"axum", "axum",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "numa" name = "numa"
version = "0.7.1" version = "0.7.2"
authors = ["razvandimescu <razvan@dimescu.com>"] authors = ["razvandimescu <razvan@dimescu.com>"]
edition = "2021" edition = "2021"
description = "Portable DNS resolver in Rust — .numa local domains, ad blocking, developer overrides, DNS-over-HTTPS" description = "Portable DNS resolver in Rust — .numa local domains, ad blocking, developer overrides, DNS-over-HTTPS"

190
README.md
View File

@@ -8,189 +8,93 @@
A portable DNS resolver in a single binary. Block ads on any network, name your local services (`frontend.numa`), and override any hostname with auto-revert — all from your laptop, no cloud account or Raspberry Pi required. A portable DNS resolver in a single binary. Block ads on any network, name your local services (`frontend.numa`), and override any hostname with auto-revert — all from your laptop, no cloud account or Raspberry Pi required.
Built from scratch in Rust. Zero DNS libraries. RFC 1035 wire protocol parsed by hand. Recursive resolution from root nameservers with full DNSSEC validation (chain-of-trust + NSEC/NSEC3 denial proofs). One ~8MB binary, no PHP, no web server, no database — everything is embedded. Built from scratch in Rust. Zero DNS libraries. RFC 1035 wire protocol parsed by hand. Recursive resolution from root nameservers with full DNSSEC chain-of-trust validation. One ~8MB binary, everything embedded.
![Numa dashboard](assets/hero-demo.gif) ![Numa dashboard](assets/hero-demo.gif)
## Quick Start ## Quick Start
```bash ```bash
# Install (pick one) brew install razvandimescu/tap/numa # or: cargo install numa
brew install razvandimescu/tap/numa sudo numa # port 53 requires root
cargo install numa
curl -fsSL https://raw.githubusercontent.com/razvandimescu/numa/main/install.sh | sh
# Run (port 53 requires root)
sudo numa
# Try it
dig @127.0.0.1 google.com # ✓ resolves normally
dig @127.0.0.1 ads.google.com # ✗ blocked → 0.0.0.0
``` ```
Open the dashboard: **http://numa.numa** (or `http://localhost:5380`) Open the dashboard: **http://numa.numa** (or `http://localhost:5380`)
### Set as system resolver Set as system DNS: `sudo numa install && sudo numa service start`
```bash ## Local Services
# Point your system DNS to Numa (saves originals for uninstall)
sudo numa install
# Run as a persistent service (auto-starts on boot, restarts if killed) Name your dev services instead of remembering port numbers:
sudo numa service start
```
To uninstall: `sudo numa service stop` removes the service, `sudo numa uninstall` restores your original DNS.
### Upgrade
```bash
# From Homebrew
brew upgrade numa
# From source
make deploy # builds release, copies binary, re-signs, restarts service
```
### Build from source
```bash
git clone https://github.com/razvandimescu/numa.git && cd numa
cargo build --release
sudo cp target/release/numa /usr/local/bin/numa
```
## Why Numa
- **Local service proxy** — `https://frontend.numa` instead of `localhost:5173`. Auto-generated TLS certs, WebSocket support for HMR. Like `/etc/hosts` but with auto TLS, a REST API, LAN discovery, and auto-revert.
- **Path-based routing** — `app.numa/api → :5001`, `app.numa/auth → :5002`. Route URL paths to different backends with optional prefix stripping. Like nginx location blocks, zero config files.
- **LAN service discovery** — Numa instances on the same network find each other automatically via mDNS. Access a teammate's `api.numa` from your machine. Opt-in via `[lan] enabled = true`.
- **Developer overrides** — point any hostname to any IP, auto-reverts after N minutes. Full REST API for scripting. Built-in diagnostics: `curl localhost:5380/diagnose/example.com` tells you exactly how any domain resolves.
- **DNS-over-HTTPS** — upstream queries encrypted via DoH. Your ISP sees HTTPS traffic, not DNS queries. Set `address = "https://9.9.9.9/dns-query"` in `[upstream]` or any DoH provider.
- **Ad blocking that travels with you** — 385K+ domains blocked via [Hagezi Pro](https://github.com/hagezi/dns-blocklists). Works on any network: coffee shops, hotels, airports.
- **Sub-microsecond caching** — 691ns cached round-trip, ~2.0M queries/sec throughput, zero heap allocations in the I/O path. [Benchmarks](bench/).
- **Live dashboard** — real-time stats, query log, blocking controls, service management. LAN accessibility badges show which services are reachable from other devices.
- **macOS, Linux, and Windows** — `numa install` configures system DNS, `numa service start` runs as launchd/systemd service.
## Local Service Proxy
Name your local dev services with `.numa` domains:
```bash ```bash
curl -X POST localhost:5380/services \ curl -X POST localhost:5380/services \
-H 'Content-Type: application/json' \
-d '{"name":"frontend","target_port":5173}' -d '{"name":"frontend","target_port":5173}'
open http://frontend.numa # → proxied to localhost:5173
``` ```
- **HTTPS with green lock** — auto-generated local CA + per-service TLS certs Now `https://frontend.numa` works in your browser — green lock, valid cert, WebSocket passthrough for HMR. No mkcert, no nginx, no `/etc/hosts`.
- **WebSocket** — Vite/webpack HMR works through the proxy
- **Health checks** — dashboard shows green/red status per service
- **LAN sharing** — services bound to `0.0.0.0` are automatically discoverable by other Numa instances on the network. Dashboard shows "LAN" or "local only" per service.
- **Path-based routing** — route URL paths to different backends:
```toml
[[services]]
name = "app"
target_port = 3000
routes = [
{ path = "/api", port = 5001 },
{ path = "/auth", port = 5002, strip = true },
]
```
`app.numa/api/users → :5001/api/users`, `app.numa/auth/login → :5002/login` (stripped)
- **Persistent** — services survive restarts
- Or configure in `numa.toml`:
```toml Add path-based routing (`app.numa/api → :5001`), share services across machines via LAN discovery, or configure everything in [`numa.toml`](numa.toml).
[[services]]
name = "frontend"
target_port = 5173
```
## LAN Service Discovery ## Ad Blocking & Privacy
Run Numa on multiple machines. They find each other automatically: 385K+ domains blocked via [Hagezi Pro](https://github.com/hagezi/dns-blocklists). Works on any network — coffee shops, hotels, airports. Travels with your laptop.
Two resolution modes: **forward** (relay to Quad9/Cloudflare via encrypted DoH) or **recursive** (resolve from root nameservers — no upstream dependency, no single entity sees your full query pattern). DNSSEC validates the full chain of trust: RRSIG signatures, DNSKEY verification, DS delegation, NSEC/NSEC3 denial proofs. [Read how it works →](https://numa.rs/blog/posts/dnssec-from-scratch.html)
## LAN Discovery
Run Numa on multiple machines. They find each other automatically via mDNS:
``` ```
Machine A (192.168.1.5) Machine B (192.168.1.20) Machine A (192.168.1.5) Machine B (192.168.1.20)
┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────────┐
│ Numa │ mDNS │ Numa │ │ Numa │ mDNS │ Numa │
services: │◄───────────►│ services: - api (port 8000) │◄───────────►│ - grafana (3000)
- api (port 8000) │ discovery │ - grafana (3000) - frontend (5173) │ discovery │
│ - frontend (5173) │ │ │
└──────────────────────┘ └──────────────────────┘ └──────────────────────┘ └──────────────────────┘
``` ```
From Machine B: From Machine B: `curl http://api.numa` → proxied to Machine A's port 8000. Enable with `numa lan on`.
```bash
dig @127.0.0.1 api.numa # → 192.168.1.5
curl http://api.numa # → proxied to Machine A's port 8000
```
Enable LAN discovery: **Hub mode**: run one instance with `bind_addr = "0.0.0.0:53"` and point other devices' DNS to it — they get ad blocking + `.numa` resolution without installing anything.
```bash
numa lan on
```
Or in `numa.toml`:
```toml
[lan]
enabled = true
```
Uses standard mDNS (`_numa._tcp.local` on port 5353) — compatible with Bonjour/Avahi, silently dropped by corporate firewalls instead of triggering IPS alerts.
**Hub mode** — don't want to install Numa on every machine? Run one instance as a shared DNS server and point other devices to it:
```bash
# On the hub machine, bind to LAN interface
[server]
bind_addr = "0.0.0.0:53"
# On other devices, set DNS to the hub's IP
# They get .numa resolution, ad blocking, caching — zero install
```
## How It Compares ## How It Compares
| | Pi-hole | AdGuard Home | NextDNS | Cloudflare | Numa | | | Pi-hole | AdGuard Home | Unbound | Numa |
|---|---|---|---|---|---| |---|---|---|---|---|
| Local service proxy | No | No | No | No | `.numa` + HTTPS + WS | | Local service proxy + auto TLS | | | | `.numa` domains, HTTPS, WebSocket |
| Path-based routing | No | No | No | No | Prefix match + strip | | LAN service discovery | | | | mDNS, zero config |
| LAN service discovery | No | No | No | No | mDNS, opt-in | | Developer overrides (REST API) | | | | Auto-revert, scriptable |
| Developer overrides | No | No | No | No | REST API + auto-expiry | | Recursive resolver | | | Yes | Yes, with SRTT selection |
| Recursive resolver | No | No | Cloud only | Cloud only | From root hints, DNSSEC | | DNSSEC validation | | — | Yes | Yes (RSA, ECDSA, Ed25519) |
| Encrypted upstream (DoH) | No (needs cloudflared) | Yes | Cloud only | Cloud only | Native, single binary | | Ad blocking | Yes | Yes | — | 385K+ domains |
| Portable (travels with laptop) | No (appliance) | No (appliance) | Cloud only | Cloud only | Single binary | | Web admin UI | Full | Full | — | Dashboard |
| Zero config | Complex | Docker/setup | Yes | Yes | Works out of the box | | Encrypted upstream (DoH) | Needs cloudflared | Yes | — | Native |
| Ad blocking | Yes | Yes | Yes | Limited | 385K+ domains | | Portable (laptop) | No (appliance) | No (appliance) | Server | Single binary |
| Data stays local | Yes | Yes | Cloud | Cloud | 100% local | | Community maturity | 56K stars, 10 years | 33K stars | 20 years | New |
## How It Works ## Performance
``` 691ns cached round-trip. ~2.0M qps throughput. Zero heap allocations in the hot path. Recursive queries average 237ms after SRTT warmup (12x improvement over round-robin). ECDSA P-256 DNSSEC verification: 174ns. [Benchmarks →](bench/)
Query → Overrides → .numa TLD → Blocklist → Local Zones → Cache → Recursive/Forward
```
Two resolution modes: **forward** (relay to upstream like Quad9/Cloudflare) or **recursive** (resolve from root nameservers — no upstream dependency). Set `mode = "recursive"` in `[upstream]` to resolve independently. ## Learn More
No DNS libraries — no `hickory-dns`, no `trust-dns`. The wire protocol — headers, labels, compression pointers, record types — is parsed and serialized by hand. Runs on `tokio` + `axum`, async per-query task spawning. - [Blog: Implementing DNSSEC from Scratch in Rust](https://numa.rs/blog/posts/dnssec-from-scratch.html)
- [Blog: I Built a DNS Resolver from Scratch](https://numa.rs/blog/posts/dns-from-scratch.html)
[Configuration reference](numa.toml) - [Configuration reference](numa.toml) — all options documented inline
- [REST API](src/api.rs) — 27 endpoints across overrides, cache, blocking, services, diagnostics
## Roadmap ## Roadmap
- [x] DNS proxy core — forwarding, caching, local zones - [x] DNS forwarding, caching, ad blocking, developer overrides
- [x] Developer overrides — REST API with auto-expiry - [x] `.numa` local domains — auto TLS, path routing, WebSocket proxy
- [x] Ad blocking — 385K+ domains, live dashboard, allowlist - [x] LAN service discovery — mDNS, cross-machine DNS + proxy
- [x] System integration — macOS + Linux, launchd/systemd, Tailscale/VPN auto-discovery - [x] DNS-over-HTTPS — encrypted upstream
- [x] Local service proxy — `.numa` domains, HTTP/HTTPS proxy, auto TLS, WebSocket - [x] Recursive resolution + DNSSEC — chain-of-trust, NSEC/NSEC3
- [x] Path-based routing — URL prefix routing with optional strip, REST API - [x] SRTT-based nameserver selection
- [x] LAN service discovery — mDNS auto-discovery (opt-in), cross-machine DNS + proxy - [ ] pkarr integration — self-sovereign DNS via Mainline DHT
- [x] DNS-over-HTTPS — encrypted upstream via DoH (Quad9, Cloudflare, any provider) - [ ] Global `.numa` names — DHT-backed, no registrar
- [x] Recursive resolution — resolve from root nameservers, no upstream dependency
- [x] DNSSEC validation — chain-of-trust, NSEC/NSEC3 denial proofs, AD bit (RSA, ECDSA, Ed25519)
- [ ] pkarr integration — self-sovereign DNS via Mainline DHT (15M nodes)
- [ ] Global `.numa` names — self-publish, DHT-backed, first-come-first-served
## License ## License

View File

@@ -178,62 +178,29 @@ pub async fn handle_query(
(resp, QueryPath::Cached, cached_dnssec) (resp, QueryPath::Cached, cached_dnssec)
} else if ctx.upstream_mode == UpstreamMode::Recursive { } else if ctx.upstream_mode == UpstreamMode::Recursive {
let key = (qname.clone(), qtype); let key = (qname.clone(), qtype);
let disposition = acquire_inflight(&ctx.inflight, key.clone()); let (resp, path, err) = resolve_coalesced(&ctx.inflight, key, &query, || {
crate::recursive::resolve_recursive(
match disposition { &qname,
Disposition::Follower(mut rx) => { qtype,
debug!("{} | {:?} {} | COALESCED", src_addr, qtype, qname); &ctx.cache,
match rx.recv().await { &query,
Ok(Some(mut resp)) => { &ctx.root_hints,
resp.header.id = query.header.id; &ctx.srtt,
(resp, QueryPath::Coalesced, DnssecStatus::Indeterminate) )
} })
_ => ( .await;
DnsPacket::response_from(&query, ResultCode::SERVFAIL), if path == QueryPath::Coalesced {
QueryPath::UpstreamError, debug!("{} | {:?} {} | COALESCED", src_addr, qtype, qname);
DnssecStatus::Indeterminate, } else if path == QueryPath::UpstreamError {
), error!(
} "{} | {:?} {} | RECURSIVE ERROR | {}",
} src_addr,
Disposition::Leader(tx) => { qtype,
// Drop guard: remove inflight entry even on panic/cancellation qname,
let guard = InflightGuard { err.as_deref().unwrap_or("leader failed")
inflight: &ctx.inflight, );
key: key.clone(),
};
let result = crate::recursive::resolve_recursive(
&qname,
qtype,
&ctx.cache,
&query,
&ctx.root_hints,
&ctx.srtt,
)
.await;
drop(guard);
match result {
Ok(resp) => {
let _ = tx.send(Some(resp.clone()));
(resp, QueryPath::Recursive, DnssecStatus::Indeterminate)
}
Err(e) => {
let _ = tx.send(None);
error!(
"{} | {:?} {} | RECURSIVE ERROR | {}",
src_addr, qtype, qname, e
);
(
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
DnssecStatus::Indeterminate,
)
}
}
}
} }
(resp, path, DnssecStatus::Indeterminate)
} else { } else {
let upstream = let upstream =
match crate::system_dns::match_forwarding_rule(&qname, &ctx.forwarding_rules) { match crate::system_dns::match_forwarding_rule(&qname, &ctx.forwarding_rules) {
@@ -432,6 +399,57 @@ fn acquire_inflight(inflight: &Mutex<InflightMap>, key: (String, QueryType)) ->
} }
} }
/// Run a resolve function with in-flight coalescing. Multiple concurrent calls
/// for the same key share a single resolution — the first caller (leader)
/// executes `resolve_fn`, and followers wait for the broadcast result.
async fn resolve_coalesced<F, Fut>(
inflight: &Mutex<InflightMap>,
key: (String, QueryType),
query: &DnsPacket,
resolve_fn: F,
) -> (DnsPacket, QueryPath, Option<String>)
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = crate::Result<DnsPacket>>,
{
let disposition = acquire_inflight(inflight, key.clone());
match disposition {
Disposition::Follower(mut rx) => match rx.recv().await {
Ok(Some(mut resp)) => {
resp.header.id = query.header.id;
(resp, QueryPath::Coalesced, None)
}
_ => (
DnsPacket::response_from(query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
None,
),
},
Disposition::Leader(tx) => {
let guard = InflightGuard { inflight, key };
let result = resolve_fn().await;
drop(guard);
match result {
Ok(resp) => {
let _ = tx.send(Some(resp.clone()));
(resp, QueryPath::Recursive, None)
}
Err(e) => {
let _ = tx.send(None);
let err_msg = e.to_string();
(
DnsPacket::response_from(query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
Some(err_msg),
)
}
}
}
}
}
struct InflightGuard<'a> { struct InflightGuard<'a> {
inflight: &'a Mutex<InflightMap>, inflight: &'a Mutex<InflightMap>,
key: (String, QueryType), key: (String, QueryType),
@@ -443,20 +461,6 @@ impl Drop for InflightGuard<'_> {
} }
} }
/// Build a wire-format DNS query packet for the given domain and type.
#[cfg(test)]
fn build_wire_query(id: u16, domain: &str, qtype: QueryType) -> BytePacketBuffer {
let mut pkt = DnsPacket::new();
pkt.header.id = id;
pkt.header.recursion_desired = true;
pkt.header.questions = 1;
pkt.questions
.push(crate::question::DnsQuestion::new(domain.to_string(), qtype));
let mut buf = BytePacketBuffer::new();
pkt.write(&mut buf).unwrap();
BytePacketBuffer::from_bytes(buf.filled())
}
fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> DnsPacket { fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> DnsPacket {
use std::net::{Ipv4Addr, Ipv6Addr}; use std::net::{Ipv4Addr, Ipv6Addr};
if qname == "ipv4only.arpa" { if qname == "ipv4only.arpa" {
@@ -495,8 +499,8 @@ fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> Dns
mod tests { mod tests {
use super::*; use super::*;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr}; use std::net::Ipv4Addr;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex};
use tokio::sync::broadcast; use tokio::sync::broadcast;
// ---- InflightGuard unit tests ---- // ---- InflightGuard unit tests ----
@@ -669,189 +673,221 @@ mod tests {
} }
} }
// ---- Integration: concurrent handle_query coalescing ---- // ---- Integration: resolve_coalesced with mock futures ----
use tokio::io::{AsyncReadExt, AsyncWriteExt}; fn mock_query(id: u16, domain: &str, qtype: QueryType) -> DnsPacket {
use tokio::net::TcpListener; let mut pkt = DnsPacket::new();
pkt.header.id = id;
/// Spawn a slow TCP DNS server that delays `delay` before responding. pkt.header.recursion_desired = true;
/// Returns (addr, query_count) where query_count is an Arc<AtomicU32> pkt.questions
/// tracking how many queries were actually resolved (not coalesced). .push(crate::question::DnsQuestion::new(domain.to_string(), qtype));
async fn spawn_slow_dns_server( pkt
delay: Duration,
) -> (SocketAddr, Arc<std::sync::atomic::AtomicU32>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let count_clone = count.clone();
tokio::spawn(async move {
loop {
let (mut stream, _) = match listener.accept().await {
Ok(c) => c,
Err(_) => break,
};
let count = count_clone.clone();
let delay = delay;
tokio::spawn(async move {
let mut len_buf = [0u8; 2];
if stream.read_exact(&mut len_buf).await.is_err() {
return;
}
let len = u16::from_be_bytes(len_buf) as usize;
let mut data = vec![0u8; len];
if stream.read_exact(&mut data).await.is_err() {
return;
}
let mut buf = BytePacketBuffer::from_bytes(&data);
let query = match DnsPacket::from_buffer(&mut buf) {
Ok(q) => q,
Err(_) => return,
};
count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Deliberate delay to create coalescing window
tokio::time::sleep(delay).await;
let mut resp = DnsPacket::response_from(&query, ResultCode::NOERROR);
resp.header.authoritative_answer = true;
if let Some(q) = query.questions.first() {
resp.answers.push(DnsRecord::A {
domain: q.name.clone(),
addr: Ipv4Addr::new(10, 0, 0, 1),
ttl: 300,
});
}
let mut resp_buf = BytePacketBuffer::new();
if resp.write(&mut resp_buf).is_err() {
return;
}
let resp_bytes = resp_buf.filled();
let mut out = Vec::with_capacity(2 + resp_bytes.len());
out.extend_from_slice(&(resp_bytes.len() as u16).to_be_bytes());
out.extend_from_slice(resp_bytes);
let _ = stream.write_all(&out).await;
});
}
});
(addr, count)
} }
async fn test_recursive_ctx(root_hint: SocketAddr) -> Arc<ServerCtx> { fn mock_response(domain: &str) -> DnsPacket {
let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap(); let mut resp = DnsPacket::new();
Arc::new(ServerCtx { resp.header.response = true;
socket, resp.header.rescode = ResultCode::NOERROR;
zone_map: HashMap::new(), resp.answers.push(DnsRecord::A {
cache: RwLock::new(crate::cache::DnsCache::new(100, 60, 86400)), domain: domain.to_string(),
stats: Mutex::new(crate::stats::ServerStats::new()), addr: Ipv4Addr::new(10, 0, 0, 1),
overrides: RwLock::new(crate::override_store::OverrideStore::new()), ttl: 300,
blocklist: RwLock::new(crate::blocklist::BlocklistStore::new()), });
query_log: Mutex::new(crate::query_log::QueryLog::new(100)), resp
services: Mutex::new(crate::service_store::ServiceStore::new()),
lan_peers: Mutex::new(crate::lan::PeerStore::new(90)),
forwarding_rules: Vec::new(),
upstream: Mutex::new(crate::forward::Upstream::Udp(
"127.0.0.1:53".parse().unwrap(),
)),
upstream_auto: false,
upstream_port: 53,
lan_ip: Mutex::new(Ipv4Addr::LOCALHOST),
timeout: Duration::from_secs(3),
proxy_tld: "numa".to_string(),
proxy_tld_suffix: ".numa".to_string(),
lan_enabled: false,
config_path: "/tmp/test-numa.toml".to_string(),
config_found: false,
config_dir: std::path::PathBuf::from("/tmp"),
data_dir: std::path::PathBuf::from("/tmp"),
tls_config: None,
upstream_mode: crate::config::UpstreamMode::Recursive,
root_hints: vec![root_hint],
srtt: RwLock::new(crate::srtt::SrttCache::new(true)),
inflight: Mutex::new(HashMap::new()),
dnssec_enabled: false,
dnssec_strict: false,
})
} }
#[tokio::test] #[tokio::test]
async fn concurrent_queries_coalesce_to_single_resolution() { async fn concurrent_queries_coalesce_to_single_resolution() {
// Force TCP-only so mock server works let inflight = Arc::new(Mutex::new(HashMap::new()));
crate::recursive::UDP_DISABLED.store(true, std::sync::atomic::Ordering::Release); let resolve_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let (server_addr, query_count) = spawn_slow_dns_server(Duration::from_millis(200)).await;
let ctx = test_recursive_ctx(server_addr).await;
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap();
// Fire 5 concurrent queries for the same (domain, A)
let mut handles = Vec::new(); let mut handles = Vec::new();
for i in 0..5u16 { for i in 0..5u16 {
let ctx = ctx.clone(); let count = resolve_count.clone();
let buf = build_wire_query(100 + i, "coalesce-test.example.com", QueryType::A); let inf = inflight.clone();
handles.push(tokio::spawn( let key = ("coalesce.test".to_string(), QueryType::A);
async move { handle_query(buf, src, &ctx).await }, let query = mock_query(100 + i, "coalesce.test", QueryType::A);
)); handles.push(tokio::spawn(async move {
resolve_coalesced(&inf, key, &query, || async {
count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(mock_response("coalesce.test"))
})
.await
}));
} }
let mut paths = Vec::new();
for h in handles { for h in handles {
h.await.unwrap().unwrap(); let (_, path, _) = h.await.unwrap();
paths.push(path);
} }
// Only 1 resolution should have reached the upstream server let actual = resolve_count.load(std::sync::atomic::Ordering::Relaxed);
let actual = query_count.load(std::sync::atomic::Ordering::Relaxed); assert_eq!(actual, 1, "expected 1 resolution, got {}", actual);
assert_eq!(actual, 1, "expected 1 upstream query, got {}", actual);
// Inflight map must be empty after all queries complete let recursive = paths.iter().filter(|p| **p == QueryPath::Recursive).count();
assert!(ctx.inflight.lock().unwrap().is_empty()); let coalesced = paths.iter().filter(|p| **p == QueryPath::Coalesced).count();
assert_eq!(recursive, 1, "expected 1 RECURSIVE, got {}", recursive);
assert_eq!(coalesced, 4, "expected 4 COALESCED, got {}", coalesced);
crate::recursive::reset_udp_state(); assert!(inflight.lock().unwrap().is_empty());
} }
#[tokio::test] #[tokio::test]
async fn different_qtypes_not_coalesced() { async fn different_qtypes_not_coalesced() {
crate::recursive::UDP_DISABLED.store(true, std::sync::atomic::Ordering::Release); let inflight = Arc::new(Mutex::new(HashMap::new()));
let resolve_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let (server_addr, query_count) = spawn_slow_dns_server(Duration::from_millis(100)).await; let inf1 = inflight.clone();
let ctx = test_recursive_ctx(server_addr).await; let inf2 = inflight.clone();
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap(); let count1 = resolve_count.clone();
let count2 = resolve_count.clone();
// Fire A and AAAA concurrently — should NOT coalesce let query_a = mock_query(200, "same.domain", QueryType::A);
let ctx_ref = ctx.clone(); let query_aaaa = mock_query(201, "same.domain", QueryType::AAAA);
let ctx_ref2 = ctx.clone();
let buf_a = build_wire_query(200, "different-qt.example.com", QueryType::A);
let buf_aaaa = build_wire_query(201, "different-qt.example.com", QueryType::AAAA);
let h1 = tokio::spawn(async move { handle_query(buf_a, src, &ctx_ref).await }); let h1 = tokio::spawn(async move {
let h2 = tokio::spawn(async move { handle_query(buf_aaaa, src, &ctx_ref2).await }); resolve_coalesced(
&inf1,
("same.domain".to_string(), QueryType::A),
&query_a,
|| async {
count1.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(mock_response("same.domain"))
},
)
.await
});
let h2 = tokio::spawn(async move {
resolve_coalesced(
&inf2,
("same.domain".to_string(), QueryType::AAAA),
&query_aaaa,
|| async {
count2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(mock_response("same.domain"))
},
)
.await
});
h1.await.unwrap().unwrap(); let (_, path1, _) = h1.await.unwrap();
h2.await.unwrap().unwrap(); let (_, path2, _) = h2.await.unwrap();
let actual = query_count.load(std::sync::atomic::Ordering::Relaxed); let actual = resolve_count.load(std::sync::atomic::Ordering::Relaxed);
assert!( assert_eq!(actual, 2, "A and AAAA should each resolve, got {}", actual);
actual >= 2, assert_eq!(path1, QueryPath::Recursive);
"A and AAAA should resolve independently, got {}", assert_eq!(path2, QueryPath::Recursive);
actual
);
assert!(ctx.inflight.lock().unwrap().is_empty());
crate::recursive::reset_udp_state(); assert!(inflight.lock().unwrap().is_empty());
} }
#[tokio::test] #[tokio::test]
async fn inflight_map_cleaned_after_upstream_error() { async fn inflight_map_cleaned_after_error() {
// Server that rejects everything — no server running at all let inflight: Mutex<InflightMap> = Mutex::new(HashMap::new());
let bogus_addr: SocketAddr = "127.0.0.1:1".parse().unwrap(); let query = mock_query(300, "will-fail.test", QueryType::A);
let ctx = test_recursive_ctx(bogus_addr).await;
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap();
let buf = build_wire_query(300, "will-fail.example.com", QueryType::A); let (_, path, _) = resolve_coalesced(
let _ = handle_query(buf, src, &ctx).await; &inflight,
("will-fail.test".to_string(), QueryType::A),
&query,
|| async { Err::<DnsPacket, _>("upstream timeout".into()) },
)
.await;
// Map must be clean even after error assert_eq!(path, QueryPath::UpstreamError);
assert!(ctx.inflight.lock().unwrap().is_empty()); assert!(inflight.lock().unwrap().is_empty());
}
#[tokio::test]
async fn follower_gets_servfail_when_leader_fails() {
let inflight = Arc::new(Mutex::new(HashMap::new()));
let mut handles = Vec::new();
for i in 0..3u16 {
let inf = inflight.clone();
let query = mock_query(400 + i, "fail.test", QueryType::A);
handles.push(tokio::spawn(async move {
resolve_coalesced(
&inf,
("fail.test".to_string(), QueryType::A),
&query,
|| async {
tokio::time::sleep(Duration::from_millis(200)).await;
Err::<DnsPacket, _>("upstream error".into())
},
)
.await
}));
}
let mut paths = Vec::new();
for h in handles {
let (resp, path, _) = h.await.unwrap();
assert_eq!(resp.header.rescode, ResultCode::SERVFAIL);
assert_eq!(
resp.questions.len(),
1,
"SERVFAIL must echo question section"
);
assert_eq!(resp.questions[0].name, "fail.test");
paths.push(path);
}
let errors = paths
.iter()
.filter(|p| **p == QueryPath::UpstreamError)
.count();
assert_eq!(errors, 3, "all 3 should be UpstreamError, got {}", errors);
assert!(inflight.lock().unwrap().is_empty());
}
#[tokio::test]
async fn servfail_leader_includes_question_section() {
let inflight: Mutex<InflightMap> = Mutex::new(HashMap::new());
let query = mock_query(500, "question.test", QueryType::A);
let (resp, _, _) = resolve_coalesced(
&inflight,
("question.test".to_string(), QueryType::A),
&query,
|| async { Err::<DnsPacket, _>("fail".into()) },
)
.await;
assert_eq!(resp.header.rescode, ResultCode::SERVFAIL);
assert_eq!(
resp.questions.len(),
1,
"SERVFAIL must echo question section"
);
assert_eq!(resp.questions[0].name, "question.test");
assert_eq!(resp.questions[0].qtype, QueryType::A);
assert_eq!(resp.header.id, 500);
}
#[tokio::test]
async fn leader_error_preserves_message() {
let inflight: Mutex<InflightMap> = Mutex::new(HashMap::new());
let query = mock_query(700, "err-msg.test", QueryType::A);
let (_, path, err) = resolve_coalesced(
&inflight,
("err-msg.test".to_string(), QueryType::A),
&query,
|| async { Err::<DnsPacket, _>("connection refused by upstream".into()) },
)
.await;
assert_eq!(path, QueryPath::UpstreamError);
assert_eq!(
err.as_deref(),
Some("connection refused by upstream"),
"error message must be preserved for logging"
);
} }
} }

View File

@@ -13,7 +13,7 @@ pub struct ServerStats {
started_at: Instant, started_at: Instant,
} }
#[derive(Clone, Copy, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum QueryPath { pub enum QueryPath {
Local, Local,
Cached, Cached,