Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
882508297e | ||
|
|
2b241c5755 | ||
|
|
7510c8e068 | ||
|
|
87c321f3d4 | ||
|
|
edfccaa2b7 | ||
|
|
0c43240c01 | ||
|
|
b615a56586 | ||
|
|
7056766a84 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1143,7 +1143,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "numa"
|
||||
version = "0.6.0"
|
||||
version = "0.7.1"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"axum",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "numa"
|
||||
version = "0.6.0"
|
||||
version = "0.7.1"
|
||||
authors = ["razvandimescu <razvan@dimescu.com>"]
|
||||
edition = "2021"
|
||||
description = "Portable DNS resolver in Rust — .numa local domains, ad blocking, developer overrides, DNS-over-HTTPS"
|
||||
@@ -10,7 +10,7 @@ keywords = ["dns", "dns-server", "ad-blocking", "reverse-proxy", "developer-tool
|
||||
categories = ["network-programming", "development-tools"]
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "time"] }
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "time", "sync"] }
|
||||
axum = "0.8"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
|
||||
8
Makefile
8
Makefile
@@ -1,4 +1,4 @@
|
||||
.PHONY: all build lint fmt check audit test coverage bench clean deploy blog
|
||||
.PHONY: all build lint fmt check audit test coverage bench clean deploy blog release
|
||||
|
||||
all: lint build test
|
||||
|
||||
@@ -33,6 +33,12 @@ blog:
|
||||
echo " $$f → site/blog/posts/$$name.html"; \
|
||||
done
|
||||
|
||||
release:
|
||||
ifndef VERSION
|
||||
$(error Usage: make release VERSION=0.8.0)
|
||||
endif
|
||||
./scripts/release.sh $(VERSION)
|
||||
|
||||
clean:
|
||||
cargo clean
|
||||
|
||||
|
||||
27
README.md
27
README.md
@@ -30,11 +30,34 @@ dig @127.0.0.1 ads.google.com # ✗ blocked → 0.0.0.0
|
||||
|
||||
Open the dashboard: **http://numa.numa** (or `http://localhost:5380`)
|
||||
|
||||
Or build from source:
|
||||
### Set as system resolver
|
||||
|
||||
```bash
|
||||
# Point your system DNS to Numa (saves originals for uninstall)
|
||||
sudo numa install
|
||||
|
||||
# Run as a persistent service (auto-starts on boot, restarts if killed)
|
||||
sudo numa service start
|
||||
```
|
||||
|
||||
To uninstall: `sudo numa service stop` removes the service, `sudo numa uninstall` restores your original DNS.
|
||||
|
||||
### Upgrade
|
||||
|
||||
```bash
|
||||
# From Homebrew
|
||||
brew upgrade numa
|
||||
|
||||
# From source
|
||||
make deploy # builds release, copies binary, re-signs, restarts service
|
||||
```
|
||||
|
||||
### Build from source
|
||||
|
||||
```bash
|
||||
git clone https://github.com/razvandimescu/numa.git && cd numa
|
||||
cargo build --release
|
||||
sudo ./target/release/numa
|
||||
sudo cp target/release/numa /usr/local/bin/numa
|
||||
```
|
||||
|
||||
## Why Numa
|
||||
|
||||
@@ -50,17 +50,7 @@ TLD priming solves this. On startup, Numa queries root for NS records of 34 comm
|
||||
|
||||
DNSSEC doesn't encrypt DNS traffic. It *signs* it. Every DNS record can have an accompanying RRSIG (signature) record. The resolver verifies the signature against the zone's DNSKEY, then verifies that DNSKEY against the parent zone's DS (delegation signer) record, walking up until it reaches the root trust anchor — a hardcoded public key that IANA publishes and the entire internet agrees on.
|
||||
|
||||
```
|
||||
cloudflare.com A 104.16.132.229
|
||||
signed by → RRSIG (key_tag=34505, algo=13, signer=cloudflare.com)
|
||||
verified with → DNSKEY (cloudflare.com, key_tag=34505, ECDSA P-256)
|
||||
vouched for by → DS (at .com, key_tag=2371, digest=SHA-256 of cloudflare's DNSKEY)
|
||||
signed by → RRSIG (key_tag=19718, signer=com)
|
||||
verified with → DNSKEY (com, key_tag=19718)
|
||||
vouched for by → DS (at root, key_tag=30909)
|
||||
signed by → RRSIG (signer=.)
|
||||
verified with → DNSKEY (., key_tag=20326) ← root trust anchor (hardcoded)
|
||||
```
|
||||
<img src="../dnssec-chain.svg" alt="DNSSEC chain of trust diagram — verifying cloudflare.com from answer through .com TLD to root trust anchor">
|
||||
|
||||
### How keys get there
|
||||
|
||||
@@ -165,11 +155,9 @@ The network fetch dominates. The crypto is noise.
|
||||
|
||||
## Surviving hostile networks
|
||||
|
||||
I deployed Numa as my system DNS and switched to a different network. Everything broke. Every query: SERVFAIL, 3-second timeout.
|
||||
I deployed Numa as my system DNS and switched networks. Everything broke — every query SERVFAIL, 3-second timeout. The ISP blocks outbound UDP port 53 to everything except whitelisted public resolvers. Root servers, TLD servers, authoritative servers — all unreachable over UDP.
|
||||
|
||||
The network probe told the story: the ISP blocks outbound UDP port 53 to all servers except a handful of whitelisted public resolvers (Google, Cloudflare). Root servers, TLD servers, authoritative servers — all unreachable over UDP. The ISP forces you onto their DNS or a blessed upstream. Recursive resolution is impossible.
|
||||
|
||||
Except TCP port 53 worked fine. And every DNS server is required to support TCP (RFC 1035 section 4.2.2). The ISP apparently only filters UDP.
|
||||
But TCP port 53 worked. Every DNS server is required to support TCP (RFC 1035 section 4.2.2). The ISP only filters UDP.
|
||||
|
||||
The fix has three parts:
|
||||
|
||||
|
||||
306
scripts/benchmark.sh
Executable file
306
scripts/benchmark.sh
Executable file
@@ -0,0 +1,306 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
API="${NUMA_API:-http://127.0.0.1:5380}"
|
||||
DNS="${NUMA_DNS:-127.0.0.1}"
|
||||
NUMA_BIN="${NUMA_BIN:-/usr/local/bin/numa}"
|
||||
LAUNCHD_PLIST="/Library/LaunchDaemons/com.numa.dns.plist"
|
||||
|
||||
DOMAINS=(
|
||||
paypal.com ebay.com zoom.us slack.com discord.com
|
||||
microsoft.com apple.com meta.com oracle.com ibm.com
|
||||
docker.com kubernetes.io prometheus.io grafana.com terraform.io
|
||||
python.org nodejs.org golang.org wikipedia.org reddit.com
|
||||
stackoverflow.com stripe.com linear.app nytimes.com bbc.co.uk
|
||||
rust-lang.org fastly.com hetzner.com uber.com airbnb.com
|
||||
notion.so figma.com netflix.com spotify.com dropbox.com
|
||||
gitlab.com twitch.tv shopify.com vercel.app mozilla.org
|
||||
)
|
||||
|
||||
stats() {
|
||||
curl -s "$API/query-log" | python3 -c "
|
||||
import sys, json
|
||||
|
||||
data = json.load(sys.stdin)
|
||||
rec = [q for q in data if q['path'] == 'RECURSIVE']
|
||||
if not rec:
|
||||
print('No recursive queries in log.')
|
||||
sys.exit()
|
||||
|
||||
vals = sorted([q['latency_ms'] for q in rec])
|
||||
n = len(vals)
|
||||
|
||||
print(f'Recursive queries: {n}')
|
||||
print(f' Avg: {sum(vals)/n:.1f}ms')
|
||||
print(f' Median: {vals[n//2]:.1f}ms')
|
||||
print(f' P95: {vals[int(n*0.95)]:.1f}ms')
|
||||
print(f' P99: {vals[int(n*0.99)]:.1f}ms')
|
||||
print(f' Min: {min(vals):.1f}ms')
|
||||
print(f' Max: {max(vals):.1f}ms')
|
||||
print(f' <100ms: {sum(1 for v in vals if v < 100)}')
|
||||
print(f' <200ms: {sum(1 for v in vals if v < 200)}')
|
||||
print(f' <500ms: {sum(1 for v in vals if v < 500)}')
|
||||
print(f' >1s: {sum(1 for v in vals if v >= 1000)}')
|
||||
print()
|
||||
print('Slowest 5:')
|
||||
for q in sorted(rec, key=lambda q: q['latency_ms'], reverse=True)[:5]:
|
||||
print(f' {q[\"latency_ms\"]:>8.1f}ms {q[\"query_type\"]:5s} {q[\"domain\"]:35s} {q[\"rescode\"]}')
|
||||
print()
|
||||
print('Fastest 5:')
|
||||
for q in sorted(rec, key=lambda q: q['latency_ms'])[:5]:
|
||||
print(f' {q[\"latency_ms\"]:>8.1f}ms {q[\"query_type\"]:5s} {q[\"domain\"]:35s} {q[\"rescode\"]}')
|
||||
"
|
||||
}
|
||||
|
||||
query_all() {
|
||||
local label="$1"
|
||||
echo "=== $label ==="
|
||||
for d in "${DOMAINS[@]}"; do
|
||||
printf " %-25s " "$d"
|
||||
dig "@$DNS" "$d" A +noall +stats 2>/dev/null | grep "Query time"
|
||||
done
|
||||
echo
|
||||
}
|
||||
|
||||
flush_cache() {
|
||||
curl -s -X DELETE "$API/cache" > /dev/null
|
||||
echo "Cache flushed ($(curl -s "$API/stats" | python3 -c "import sys,json; print(json.load(sys.stdin)['cache']['entries'])" 2>/dev/null || echo '?') entries)."
|
||||
}
|
||||
|
||||
wait_for_api() {
|
||||
local attempts=0
|
||||
while ! curl -sf "$API/health" > /dev/null 2>&1; do
|
||||
attempts=$((attempts + 1))
|
||||
if [ $attempts -ge 20 ]; then
|
||||
echo "ERROR: API not reachable at $API after 10s" >&2
|
||||
exit 1
|
||||
fi
|
||||
sleep 0.5
|
||||
done
|
||||
}
|
||||
|
||||
wait_for_priming() {
|
||||
echo -n "Waiting for TLD priming..."
|
||||
local prev=0
|
||||
local stable=0
|
||||
for _ in $(seq 1 60); do
|
||||
local entries
|
||||
entries=$(curl -s "$API/stats" | python3 -c "import sys,json; print(json.load(sys.stdin)['cache']['entries'])" 2>/dev/null || echo 0)
|
||||
if [ "$entries" -gt 0 ] && [ "$entries" = "$prev" ]; then
|
||||
stable=$((stable + 1))
|
||||
if [ $stable -ge 3 ]; then
|
||||
echo " done ($entries cache entries)."
|
||||
return
|
||||
fi
|
||||
else
|
||||
stable=0
|
||||
fi
|
||||
prev="$entries"
|
||||
sleep 1
|
||||
done
|
||||
echo " timeout (cache: $prev entries)."
|
||||
}
|
||||
|
||||
# restart_numa <config_toml_body>
|
||||
# Writes config to a temp file, stops numa (launchd or manual), starts with that config.
|
||||
restart_numa() {
|
||||
local config_body="$1"
|
||||
local tmpconf
|
||||
tmpconf=$(mktemp /tmp/numa-bench-XXXXXX)
|
||||
mv "$tmpconf" "${tmpconf}.toml"
|
||||
tmpconf="${tmpconf}.toml"
|
||||
echo "$config_body" > "$tmpconf"
|
||||
|
||||
# Stop launchd-managed numa if active
|
||||
if sudo launchctl list com.numa.dns &>/dev/null; then
|
||||
sudo launchctl unload "$LAUNCHD_PLIST" 2>/dev/null || true
|
||||
sleep 1
|
||||
fi
|
||||
|
||||
# Kill any remaining
|
||||
sudo killall numa 2>/dev/null || true
|
||||
sleep 2
|
||||
|
||||
sudo "$NUMA_BIN" "$tmpconf" &
|
||||
wait_for_api
|
||||
wait_for_priming
|
||||
echo "numa ready (pid $(pgrep numa | head -1), config: $tmpconf)."
|
||||
}
|
||||
|
||||
# Restore the launchd service
|
||||
restore_launchd() {
|
||||
sudo killall numa 2>/dev/null || true
|
||||
sleep 1
|
||||
if [ -f "$LAUNCHD_PLIST" ]; then
|
||||
sudo launchctl load "$LAUNCHD_PLIST" 2>/dev/null || true
|
||||
echo "Restored launchd service."
|
||||
fi
|
||||
}
|
||||
|
||||
run_pass() {
|
||||
local label="$1"
|
||||
flush_cache
|
||||
sleep 0.5
|
||||
query_all "$label"
|
||||
echo "=== $label — stats ==="
|
||||
stats
|
||||
}
|
||||
|
||||
case "${1:-full}" in
|
||||
cold)
|
||||
echo "--- Cold cache benchmark ---"
|
||||
run_pass "Cold SRTT + Cold cache"
|
||||
;;
|
||||
warm)
|
||||
echo "--- Warm SRTT benchmark ---"
|
||||
echo "Priming SRTT..."
|
||||
for d in "${DOMAINS[@]}"; do dig "@$DNS" "$d" A +short > /dev/null 2>&1; done
|
||||
run_pass "Warm SRTT + Cold cache"
|
||||
;;
|
||||
stats)
|
||||
stats
|
||||
;;
|
||||
compare-srtt)
|
||||
echo "============================================"
|
||||
echo " A/B: SRTT OFF vs ON (dnssec off)"
|
||||
echo "============================================"
|
||||
echo
|
||||
|
||||
restart_numa "$(cat <<'TOML'
|
||||
[upstream]
|
||||
mode = "recursive"
|
||||
srtt = false
|
||||
TOML
|
||||
)"
|
||||
echo
|
||||
run_pass "SRTT OFF"
|
||||
|
||||
echo
|
||||
echo "--------------------------------------------"
|
||||
echo
|
||||
|
||||
restart_numa "$(cat <<'TOML'
|
||||
[upstream]
|
||||
mode = "recursive"
|
||||
srtt = true
|
||||
TOML
|
||||
)"
|
||||
echo
|
||||
run_pass "SRTT ON"
|
||||
|
||||
echo
|
||||
restore_launchd
|
||||
;;
|
||||
compare-dnssec)
|
||||
echo "============================================"
|
||||
echo " A/B: DNSSEC OFF vs ON (srtt on)"
|
||||
echo "============================================"
|
||||
echo
|
||||
|
||||
restart_numa "$(cat <<'TOML'
|
||||
[upstream]
|
||||
mode = "recursive"
|
||||
srtt = true
|
||||
|
||||
[dnssec]
|
||||
enabled = false
|
||||
TOML
|
||||
)"
|
||||
echo
|
||||
run_pass "DNSSEC OFF"
|
||||
|
||||
echo
|
||||
echo "--------------------------------------------"
|
||||
echo
|
||||
|
||||
restart_numa "$(cat <<'TOML'
|
||||
[upstream]
|
||||
mode = "recursive"
|
||||
srtt = true
|
||||
|
||||
[dnssec]
|
||||
enabled = true
|
||||
TOML
|
||||
)"
|
||||
echo
|
||||
run_pass "DNSSEC ON"
|
||||
|
||||
echo
|
||||
restore_launchd
|
||||
;;
|
||||
compare-all)
|
||||
echo "============================================"
|
||||
echo " Full A/B matrix"
|
||||
echo " 1. SRTT OFF + DNSSEC OFF (baseline)"
|
||||
echo " 2. SRTT ON + DNSSEC OFF"
|
||||
echo " 3. SRTT ON + DNSSEC ON"
|
||||
echo "============================================"
|
||||
echo
|
||||
|
||||
# --- 1. Baseline ---
|
||||
restart_numa "$(cat <<'TOML'
|
||||
[upstream]
|
||||
mode = "recursive"
|
||||
srtt = false
|
||||
|
||||
[dnssec]
|
||||
enabled = false
|
||||
TOML
|
||||
)"
|
||||
echo
|
||||
run_pass "SRTT OFF + DNSSEC OFF"
|
||||
|
||||
echo
|
||||
echo "--------------------------------------------"
|
||||
echo
|
||||
|
||||
# --- 2. SRTT only ---
|
||||
restart_numa "$(cat <<'TOML'
|
||||
[upstream]
|
||||
mode = "recursive"
|
||||
srtt = true
|
||||
|
||||
[dnssec]
|
||||
enabled = false
|
||||
TOML
|
||||
)"
|
||||
echo
|
||||
run_pass "SRTT ON + DNSSEC OFF"
|
||||
|
||||
echo
|
||||
echo "--------------------------------------------"
|
||||
echo
|
||||
|
||||
# --- 3. Both ---
|
||||
restart_numa "$(cat <<'TOML'
|
||||
[upstream]
|
||||
mode = "recursive"
|
||||
srtt = true
|
||||
|
||||
[dnssec]
|
||||
enabled = true
|
||||
TOML
|
||||
)"
|
||||
echo
|
||||
run_pass "SRTT ON + DNSSEC ON"
|
||||
|
||||
echo
|
||||
restore_launchd
|
||||
;;
|
||||
full|*)
|
||||
echo "--- Full benchmark (cold → warm → SRTT-only) ---"
|
||||
echo
|
||||
|
||||
wait_for_priming
|
||||
flush_cache
|
||||
sleep 0.5
|
||||
query_all "Pass 1: Cold SRTT + Cold cache"
|
||||
|
||||
flush_cache
|
||||
sleep 0.5
|
||||
query_all "Pass 2: Warm SRTT + Cold cache"
|
||||
|
||||
echo "=== Pass 2 stats (SRTT-warm) ==="
|
||||
stats
|
||||
;;
|
||||
esac
|
||||
43
scripts/release.sh
Executable file
43
scripts/release.sh
Executable file
@@ -0,0 +1,43 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
if [ $# -ne 1 ]; then
|
||||
echo "Usage: $0 <version> (e.g. 0.7.0)" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERSION="$1"
|
||||
TAG="v$VERSION"
|
||||
|
||||
# Sanity checks
|
||||
if ! git diff --quiet || ! git diff --cached --quiet; then
|
||||
echo "ERROR: working tree is dirty — commit or stash first" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$(git branch --show-current)" != "main" ]; then
|
||||
echo "ERROR: must be on main branch" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if git tag -l "$TAG" | grep -q .; then
|
||||
echo "ERROR: tag $TAG already exists" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
CURRENT=$(grep '^version = ' Cargo.toml | head -1 | sed 's/version = "\(.*\)"/\1/')
|
||||
echo "Bumping $CURRENT -> $VERSION"
|
||||
|
||||
# Bump version
|
||||
sed -i.bak "s/^version = \"$CURRENT\"/version = \"$VERSION\"/" Cargo.toml
|
||||
rm -f Cargo.toml.bak
|
||||
cargo update --workspace
|
||||
|
||||
# Commit, tag, push
|
||||
git add Cargo.toml Cargo.lock
|
||||
git commit -m "chore: bump version to $VERSION"
|
||||
git tag "$TAG"
|
||||
git push origin main --tags
|
||||
|
||||
echo
|
||||
echo "Released $TAG — GitHub Actions will build, publish to crates.io, and create the release."
|
||||
136
site/blog/dnssec-chain.svg
Normal file
136
site/blog/dnssec-chain.svg
Normal file
@@ -0,0 +1,136 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 720 680" font-family="'DM Sans', system-ui, sans-serif" font-size="13">
|
||||
<defs>
|
||||
<marker id="arr" viewBox="0 0 10 10" refX="10" refY="5" markerWidth="7" markerHeight="7" orient="auto-start-reverse">
|
||||
<path d="M 0 0 L 10 5 L 0 10 z" fill="#64748b"/>
|
||||
</marker>
|
||||
<marker id="arr-amber" viewBox="0 0 10 10" refX="10" refY="5" markerWidth="7" markerHeight="7" orient="auto-start-reverse">
|
||||
<path d="M 0 0 L 10 5 L 0 10 z" fill="#c0623a"/>
|
||||
</marker>
|
||||
<marker id="arr-teal" viewBox="0 0 10 10" refX="10" refY="5" markerWidth="7" markerHeight="7" orient="auto-start-reverse">
|
||||
<path d="M 0 0 L 10 5 L 0 10 z" fill="#6b7c4e"/>
|
||||
</marker>
|
||||
<filter id="s" x="-3%" y="-3%" width="106%" height="106%">
|
||||
<feDropShadow dx="0" dy="1" stdDeviation="2" flood-opacity="0.06"/>
|
||||
</filter>
|
||||
</defs>
|
||||
|
||||
<!-- Background -->
|
||||
<rect width="720" height="680" rx="8" fill="#faf7f2"/>
|
||||
|
||||
<!-- Title -->
|
||||
<text x="360" y="36" text-anchor="middle" font-size="15" font-weight="600" fill="#2c2418" font-family="'Instrument Serif', Georgia, serif" letter-spacing="-0.02em">DNSSEC Chain of Trust</text>
|
||||
<text x="360" y="54" text-anchor="middle" font-size="11" fill="#a39888">Verifying cloudflare.com — from answer to root trust anchor</text>
|
||||
|
||||
<!-- Legend -->
|
||||
<g transform="translate(28, 72)">
|
||||
<rect width="14" height="14" rx="3" fill="#c0623a" opacity="0.15" stroke="#c0623a" stroke-width="1"/>
|
||||
<text x="20" y="12" font-size="11" fill="#6b5e4f">Verify signature (RRSIG → DNSKEY)</text>
|
||||
<rect x="230" width="14" height="14" rx="3" fill="#6b7c4e" opacity="0.15" stroke="#6b7c4e" stroke-width="1"/>
|
||||
<text x="250" y="12" font-size="11" fill="#6b5e4f">Vouch for key (DS → parent DNSKEY)</text>
|
||||
<rect x="478" width="14" height="14" rx="3" fill="#2c2418" opacity="0.08" stroke="#2c2418" stroke-opacity="0.15" stroke-width="1"/>
|
||||
<text x="498" y="12" font-size="11" fill="#6b5e4f">DNS record / key</text>
|
||||
</g>
|
||||
|
||||
<!-- ═══ ZONE: cloudflare.com ═══ -->
|
||||
<rect x="40" y="104" width="640" height="152" rx="8" fill="none" stroke="rgba(0,0,0,0.06)" stroke-dasharray="4,3"/>
|
||||
<text x="56" y="122" font-size="10" font-weight="600" fill="#a39888" letter-spacing="0.08em" font-family="'JetBrains Mono', monospace">CLOUDFLARE.COM ZONE</text>
|
||||
|
||||
<!-- A record -->
|
||||
<rect x="80" y="138" width="320" height="38" rx="6" fill="white" stroke="rgba(0,0,0,0.08)" filter="url(#s)"/>
|
||||
<text x="96" y="157" font-size="12" font-weight="600" fill="#2c2418" font-family="'JetBrains Mono', monospace">cloudflare.com A 104.16.132.229</text>
|
||||
<text x="96" y="170" font-size="10" fill="#a39888">The answer we want to verify</text>
|
||||
|
||||
<!-- RRSIG -->
|
||||
<line x1="400" y1="157" x2="440" y2="157" stroke="#c0623a" stroke-width="1.5" marker-end="url(#arr-amber)"/>
|
||||
<text x="412" y="149" font-size="9" fill="#c0623a" font-weight="600">signed by</text>
|
||||
|
||||
<rect x="445" y="138" width="220" height="38" rx="6" fill="rgba(192,98,58,0.06)" stroke="rgba(192,98,58,0.2)" filter="url(#s)"/>
|
||||
<text x="461" y="155" font-size="11" font-weight="600" fill="#9e4e2d" font-family="'JetBrains Mono', monospace">RRSIG</text>
|
||||
<text x="505" y="155" font-size="11" fill="#6b5e4f">tag=34505, algo=13</text>
|
||||
<text x="461" y="170" font-size="10" fill="#a39888">signer: cloudflare.com</text>
|
||||
|
||||
<!-- DNSKEY -->
|
||||
<rect x="80" y="192" width="320" height="50" rx="6" fill="white" stroke="rgba(0,0,0,0.08)" filter="url(#s)"/>
|
||||
<text x="96" y="211" font-size="11" font-weight="600" fill="#2c2418" font-family="'JetBrains Mono', monospace">DNSKEY</text>
|
||||
<text x="156" y="211" font-size="11" fill="#6b5e4f">cloudflare.com, tag=34505</text>
|
||||
<text x="96" y="228" font-size="11" fill="#6b7c4e" font-weight="500">ECDSA P-256</text>
|
||||
<text x="194" y="228" font-size="10" fill="#a39888">— 174ns to verify</text>
|
||||
|
||||
<!-- RRSIG → DNSKEY arrow -->
|
||||
<path d="M 555 176 L 555 192 L 400 192 L 400 200" stroke="#c0623a" stroke-width="1.5" fill="none" marker-end="url(#arr-amber)"/>
|
||||
<text x="460" y="189" font-size="9" fill="#c0623a" font-weight="600">verified with</text>
|
||||
|
||||
<!-- ═══ ZONE: .com ═══ -->
|
||||
<rect x="40" y="270" width="640" height="132" rx="8" fill="none" stroke="rgba(0,0,0,0.06)" stroke-dasharray="4,3"/>
|
||||
<text x="56" y="288" font-size="10" font-weight="600" fill="#a39888" letter-spacing="0.08em" font-family="'JetBrains Mono', monospace">.COM TLD ZONE</text>
|
||||
|
||||
<!-- DS connecting zones -->
|
||||
<line x1="240" y1="242" x2="240" y2="302" stroke="#6b7c4e" stroke-width="1.5" marker-end="url(#arr-teal)"/>
|
||||
<text x="252" y="276" font-size="9" fill="#6b7c4e" font-weight="600">vouched for by</text>
|
||||
|
||||
<!-- DS record at .com -->
|
||||
<rect x="80" y="304" width="320" height="38" rx="6" fill="rgba(107,124,78,0.06)" stroke="rgba(107,124,78,0.2)" filter="url(#s)"/>
|
||||
<text x="96" y="321" font-size="11" font-weight="600" fill="#566540" font-family="'JetBrains Mono', monospace">DS</text>
|
||||
<text x="118" y="321" font-size="11" fill="#6b5e4f">tag=2371, digest=SHA-256</text>
|
||||
<text x="96" y="336" font-size="10" fill="#a39888">hash of cloudflare.com DNSKEY</text>
|
||||
|
||||
<!-- DS signed by RRSIG -->
|
||||
<line x1="400" y1="323" x2="440" y2="323" stroke="#c0623a" stroke-width="1.5" marker-end="url(#arr-amber)"/>
|
||||
<text x="412" y="315" font-size="9" fill="#c0623a" font-weight="600">signed by</text>
|
||||
|
||||
<rect x="445" y="304" width="220" height="38" rx="6" fill="rgba(192,98,58,0.06)" stroke="rgba(192,98,58,0.2)" filter="url(#s)"/>
|
||||
<text x="461" y="321" font-size="11" font-weight="600" fill="#9e4e2d" font-family="'JetBrains Mono', monospace">RRSIG</text>
|
||||
<text x="505" y="321" font-size="11" fill="#6b5e4f">tag=19718, signer=com</text>
|
||||
|
||||
<!-- .com DNSKEY -->
|
||||
<rect x="80" y="356" width="320" height="32" rx="6" fill="white" stroke="rgba(0,0,0,0.08)" filter="url(#s)"/>
|
||||
<text x="96" y="377" font-size="11" font-weight="600" fill="#2c2418" font-family="'JetBrains Mono', monospace">DNSKEY</text>
|
||||
<text x="156" y="377" font-size="11" fill="#6b5e4f">com, tag=19718</text>
|
||||
|
||||
<!-- RRSIG → .com DNSKEY -->
|
||||
<path d="M 555 342 L 555 356 L 400 356 L 400 366" stroke="#c0623a" stroke-width="1.5" fill="none" marker-end="url(#arr-amber)"/>
|
||||
<text x="460" y="353" font-size="9" fill="#c0623a" font-weight="600">verified with</text>
|
||||
|
||||
<!-- ═══ ZONE: root ═══ -->
|
||||
<rect x="40" y="404" width="640" height="132" rx="8" fill="none" stroke="rgba(0,0,0,0.06)" stroke-dasharray="4,3"/>
|
||||
<text x="56" y="422" font-size="10" font-weight="600" fill="#a39888" letter-spacing="0.08em" font-family="'JetBrains Mono', monospace">ROOT ZONE (.)</text>
|
||||
|
||||
<!-- DS connecting .com → root -->
|
||||
<line x1="240" y1="388" x2="240" y2="436" stroke="#6b7c4e" stroke-width="1.5" marker-end="url(#arr-teal)"/>
|
||||
<text x="252" y="416" font-size="9" fill="#6b7c4e" font-weight="600">vouched for by</text>
|
||||
|
||||
<!-- DS at root -->
|
||||
<rect x="80" y="438" width="320" height="38" rx="6" fill="rgba(107,124,78,0.06)" stroke="rgba(107,124,78,0.2)" filter="url(#s)"/>
|
||||
<text x="96" y="455" font-size="11" font-weight="600" fill="#566540" font-family="'JetBrains Mono', monospace">DS</text>
|
||||
<text x="118" y="455" font-size="11" fill="#6b5e4f">tag=30909, digest=SHA-256</text>
|
||||
<text x="96" y="470" font-size="10" fill="#a39888">hash of com DNSKEY</text>
|
||||
|
||||
<!-- DS signed by root RRSIG -->
|
||||
<line x1="400" y1="457" x2="440" y2="457" stroke="#c0623a" stroke-width="1.5" marker-end="url(#arr-amber)"/>
|
||||
<text x="412" y="449" font-size="9" fill="#c0623a" font-weight="600">signed by</text>
|
||||
|
||||
<rect x="445" y="438" width="220" height="38" rx="6" fill="rgba(192,98,58,0.06)" stroke="rgba(192,98,58,0.2)" filter="url(#s)"/>
|
||||
<text x="461" y="455" font-size="11" font-weight="600" fill="#9e4e2d" font-family="'JetBrains Mono', monospace">RRSIG</text>
|
||||
<text x="505" y="455" font-size="11" fill="#6b5e4f">signer=.</text>
|
||||
|
||||
<!-- Root DNSKEY -->
|
||||
<rect x="80" y="490" width="320" height="32" rx="6" fill="white" stroke="rgba(0,0,0,0.08)" filter="url(#s)"/>
|
||||
<text x="96" y="511" font-size="11" font-weight="600" fill="#2c2418" font-family="'JetBrains Mono', monospace">DNSKEY</text>
|
||||
<text x="156" y="511" font-size="11" fill="#6b5e4f">root (.), tag=20326, RSA/SHA-256</text>
|
||||
|
||||
<!-- RRSIG → root DNSKEY -->
|
||||
<path d="M 555 476 L 555 490 L 400 490 L 400 500" stroke="#c0623a" stroke-width="1.5" fill="none" marker-end="url(#arr-amber)"/>
|
||||
<text x="460" y="487" font-size="9" fill="#c0623a" font-weight="600">verified with</text>
|
||||
|
||||
<!-- ═══ TRUST ANCHOR ═══ -->
|
||||
<line x1="240" y1="522" x2="240" y2="558" stroke="#2c2418" stroke-width="2" stroke-dasharray="4,3"/>
|
||||
|
||||
<rect x="120" y="560" width="480" height="52" rx="8" fill="#2c2418" filter="url(#s)"/>
|
||||
<text x="360" y="582" text-anchor="middle" font-size="12" font-weight="600" fill="#faf7f2" font-family="'JetBrains Mono', monospace">ROOT TRUST ANCHOR</text>
|
||||
<text x="360" y="600" text-anchor="middle" font-size="11" fill="#a39888">IANA KSK, key_tag=20326 — hardcoded in Numa as const [u8; 256]</text>
|
||||
|
||||
<!-- Flow summary -->
|
||||
<text x="360" y="646" text-anchor="middle" font-size="12" fill="#6b5e4f" font-style="italic">Trust flows up (DS records). Keys flow down (DNSKEY → RRSIG).</text>
|
||||
<text x="360" y="664" text-anchor="middle" font-size="11" fill="#a39888">If any link breaks — wrong signature, missing DS, expired RRSIG — Numa rejects the response.</text>
|
||||
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 9.2 KiB |
@@ -285,6 +285,7 @@ body {
|
||||
.path-tag.OVERRIDE { background: rgba(82, 122, 82, 0.12); color: var(--emerald); }
|
||||
.path-tag.SERVFAIL { background: rgba(181, 68, 58, 0.12); color: var(--rose); }
|
||||
.path-tag.BLOCKED { background: rgba(163, 152, 136, 0.15); color: var(--text-dim); }
|
||||
.path-tag.COALESCED { background: rgba(138, 104, 158, 0.12); color: var(--violet-dim); }
|
||||
|
||||
/* Sidebar panels */
|
||||
.sidebar {
|
||||
@@ -547,6 +548,8 @@ body {
|
||||
<select id="logFilterPath" onchange="applyLogFilter()"
|
||||
style="font-family:var(--font-mono);font-size:0.7rem;padding:0.25rem 0.4rem;border:1px solid var(--border);border-radius:4px;background:var(--bg-surface);color:var(--text-secondary);outline:none;">
|
||||
<option value="">all paths</option>
|
||||
<option value="RECURSIVE">recursive</option>
|
||||
<option value="COALESCED">coalesced</option>
|
||||
<option value="FORWARD">forward</option>
|
||||
<option value="CACHED">cached</option>
|
||||
<option value="BLOCKED">blocked</option>
|
||||
@@ -879,6 +882,10 @@ async function refresh() {
|
||||
document.getElementById('footerUpstream').textContent = stats.upstream || '';
|
||||
document.getElementById('footerConfig').textContent = stats.config_path || '';
|
||||
document.getElementById('footerData').textContent = stats.data_dir || '';
|
||||
document.getElementById('footerDnssec').textContent = stats.dnssec ? 'on' : 'off';
|
||||
document.getElementById('footerDnssec').style.color = stats.dnssec ? 'var(--emerald)' : 'var(--text-dim)';
|
||||
document.getElementById('footerSrtt').textContent = stats.srtt ? 'on' : 'off';
|
||||
document.getElementById('footerSrtt').style.color = stats.srtt ? 'var(--emerald)' : 'var(--text-dim)';
|
||||
|
||||
// LAN status indicator
|
||||
const lanEl = document.getElementById('lanToggle');
|
||||
@@ -1229,6 +1236,8 @@ setInterval(refresh, 2000);
|
||||
Config: <span id="footerConfig" style="user-select:all;color:var(--emerald);"></span>
|
||||
· Data: <span id="footerData" style="user-select:all;color:var(--emerald);"></span>
|
||||
· Upstream: <span id="footerUpstream" style="user-select:all;color:var(--emerald);"></span>
|
||||
· DNSSEC: <span id="footerDnssec" style="color:var(--text-dim);">—</span>
|
||||
· SRTT: <span id="footerSrtt" style="color:var(--text-dim);">—</span>
|
||||
· Logs: <span style="user-select:all;color:var(--emerald);">macOS: /usr/local/var/log/numa.log · Linux: journalctl -u numa -f</span>
|
||||
· <a href="https://github.com/razvandimescu/numa" target="_blank" rel="noopener" style="color:var(--amber);text-decoration:none;">GitHub</a>
|
||||
</div>
|
||||
|
||||
@@ -162,6 +162,8 @@ struct StatsResponse {
|
||||
upstream: String,
|
||||
config_path: String,
|
||||
data_dir: String,
|
||||
dnssec: bool,
|
||||
srtt: bool,
|
||||
queries: QueriesStats,
|
||||
cache: CacheStats,
|
||||
overrides: OverrideStats,
|
||||
@@ -180,6 +182,7 @@ struct QueriesStats {
|
||||
total: u64,
|
||||
forwarded: u64,
|
||||
recursive: u64,
|
||||
coalesced: u64,
|
||||
cached: u64,
|
||||
local: u64,
|
||||
overridden: u64,
|
||||
@@ -491,10 +494,13 @@ async fn stats(State(ctx): State<Arc<ServerCtx>>) -> Json<StatsResponse> {
|
||||
upstream,
|
||||
config_path: ctx.config_path.clone(),
|
||||
data_dir: ctx.data_dir.to_string_lossy().to_string(),
|
||||
dnssec: ctx.dnssec_enabled,
|
||||
srtt: ctx.srtt.read().unwrap().is_enabled(),
|
||||
queries: QueriesStats {
|
||||
total: snap.total,
|
||||
forwarded: snap.forwarded,
|
||||
recursive: snap.recursive,
|
||||
coalesced: snap.coalesced,
|
||||
cached: snap.cached,
|
||||
local: snap.local,
|
||||
overridden: snap.overridden,
|
||||
@@ -948,6 +954,8 @@ mod tests {
|
||||
tls_config: None,
|
||||
upstream_mode: crate::config::UpstreamMode::Forward,
|
||||
root_hints: Vec::new(),
|
||||
srtt: RwLock::new(crate::srtt::SrttCache::new(true)),
|
||||
inflight: Mutex::new(std::collections::HashMap::new()),
|
||||
dnssec_enabled: false,
|
||||
dnssec_strict: false,
|
||||
})
|
||||
|
||||
@@ -85,6 +85,8 @@ pub struct UpstreamConfig {
|
||||
pub root_hints: Vec<String>,
|
||||
#[serde(default = "default_prime_tlds")]
|
||||
pub prime_tlds: Vec<String>,
|
||||
#[serde(default = "default_srtt")]
|
||||
pub srtt: bool,
|
||||
}
|
||||
|
||||
impl Default for UpstreamConfig {
|
||||
@@ -96,10 +98,15 @@ impl Default for UpstreamConfig {
|
||||
timeout_ms: default_timeout_ms(),
|
||||
root_hints: default_root_hints(),
|
||||
prime_tlds: default_prime_tlds(),
|
||||
srtt: default_srtt(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_srtt() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn default_prime_tlds() -> Vec<String> {
|
||||
vec![
|
||||
// gTLDs
|
||||
|
||||
465
src/ctx.rs
465
src/ctx.rs
@@ -1,3 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
@@ -7,6 +8,9 @@ use arc_swap::ArcSwap;
|
||||
use log::{debug, error, info, warn};
|
||||
use rustls::ServerConfig;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
type InflightMap = HashMap<(String, QueryType), broadcast::Sender<Option<DnsPacket>>>;
|
||||
|
||||
use crate::blocklist::BlocklistStore;
|
||||
use crate::buffer::BytePacketBuffer;
|
||||
@@ -21,6 +25,7 @@ use crate::query_log::{QueryLog, QueryLogEntry};
|
||||
use crate::question::QueryType;
|
||||
use crate::record::DnsRecord;
|
||||
use crate::service_store::ServiceStore;
|
||||
use crate::srtt::SrttCache;
|
||||
use crate::stats::{QueryPath, ServerStats};
|
||||
use crate::system_dns::ForwardingRule;
|
||||
|
||||
@@ -51,6 +56,8 @@ pub struct ServerCtx {
|
||||
pub tls_config: Option<ArcSwap<ServerConfig>>,
|
||||
pub upstream_mode: UpstreamMode,
|
||||
pub root_hints: Vec<SocketAddr>,
|
||||
pub srtt: RwLock<SrttCache>,
|
||||
pub inflight: Mutex<InflightMap>,
|
||||
pub dnssec_enabled: bool,
|
||||
pub dnssec_strict: bool,
|
||||
}
|
||||
@@ -170,17 +177,50 @@ pub async fn handle_query(
|
||||
}
|
||||
(resp, QueryPath::Cached, cached_dnssec)
|
||||
} else if ctx.upstream_mode == UpstreamMode::Recursive {
|
||||
match crate::recursive::resolve_recursive(
|
||||
let key = (qname.clone(), qtype);
|
||||
let disposition = acquire_inflight(&ctx.inflight, key.clone());
|
||||
|
||||
match disposition {
|
||||
Disposition::Follower(mut rx) => {
|
||||
debug!("{} | {:?} {} | COALESCED", src_addr, qtype, qname);
|
||||
match rx.recv().await {
|
||||
Ok(Some(mut resp)) => {
|
||||
resp.header.id = query.header.id;
|
||||
(resp, QueryPath::Coalesced, DnssecStatus::Indeterminate)
|
||||
}
|
||||
_ => (
|
||||
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
|
||||
QueryPath::UpstreamError,
|
||||
DnssecStatus::Indeterminate,
|
||||
),
|
||||
}
|
||||
}
|
||||
Disposition::Leader(tx) => {
|
||||
// Drop guard: remove inflight entry even on panic/cancellation
|
||||
let guard = InflightGuard {
|
||||
inflight: &ctx.inflight,
|
||||
key: key.clone(),
|
||||
};
|
||||
|
||||
let result = crate::recursive::resolve_recursive(
|
||||
&qname,
|
||||
qtype,
|
||||
&ctx.cache,
|
||||
&query,
|
||||
&ctx.root_hints,
|
||||
&ctx.srtt,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => (resp, QueryPath::Recursive, DnssecStatus::Indeterminate),
|
||||
.await;
|
||||
|
||||
drop(guard);
|
||||
|
||||
match result {
|
||||
Ok(resp) => {
|
||||
let _ = tx.send(Some(resp.clone()));
|
||||
(resp, QueryPath::Recursive, DnssecStatus::Indeterminate)
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx.send(None);
|
||||
error!(
|
||||
"{} | {:?} {} | RECURSIVE ERROR | {}",
|
||||
src_addr, qtype, qname, e
|
||||
@@ -192,6 +232,8 @@ pub async fn handle_query(
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let upstream =
|
||||
match crate::system_dns::match_forwarding_rule(&qname, &ctx.forwarding_rules) {
|
||||
@@ -226,7 +268,8 @@ pub async fn handle_query(
|
||||
let mut dnssec = dnssec;
|
||||
if ctx.dnssec_enabled && path == QueryPath::Recursive {
|
||||
let (status, vstats) =
|
||||
crate::dnssec::validate_response(&response, &ctx.cache, &ctx.root_hints).await;
|
||||
crate::dnssec::validate_response(&response, &ctx.cache, &ctx.root_hints, &ctx.srtt)
|
||||
.await;
|
||||
|
||||
debug!(
|
||||
"DNSSEC | {} | {:?} | {}ms | dnskey_hit={} dnskey_fetch={} ds_hit={} ds_fetch={}",
|
||||
@@ -366,7 +409,52 @@ fn is_special_use_domain(qname: &str) -> bool {
|
||||
return true;
|
||||
}
|
||||
// NAT64 (RFC 8880)
|
||||
qname == "ipv4only.arpa"
|
||||
if qname == "ipv4only.arpa" {
|
||||
return true;
|
||||
}
|
||||
// RFC 6762: .local is reserved for mDNS — never forward to upstream
|
||||
qname == "local" || qname.ends_with(".local")
|
||||
}
|
||||
|
||||
enum Disposition {
|
||||
Leader(broadcast::Sender<Option<DnsPacket>>),
|
||||
Follower(broadcast::Receiver<Option<DnsPacket>>),
|
||||
}
|
||||
|
||||
fn acquire_inflight(inflight: &Mutex<InflightMap>, key: (String, QueryType)) -> Disposition {
|
||||
let mut map = inflight.lock().unwrap();
|
||||
if let Some(tx) = map.get(&key) {
|
||||
Disposition::Follower(tx.subscribe())
|
||||
} else {
|
||||
let (tx, _) = broadcast::channel::<Option<DnsPacket>>(1);
|
||||
map.insert(key, tx.clone());
|
||||
Disposition::Leader(tx)
|
||||
}
|
||||
}
|
||||
|
||||
struct InflightGuard<'a> {
|
||||
inflight: &'a Mutex<InflightMap>,
|
||||
key: (String, QueryType),
|
||||
}
|
||||
|
||||
impl Drop for InflightGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.inflight.lock().unwrap().remove(&self.key);
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a wire-format DNS query packet for the given domain and type.
|
||||
#[cfg(test)]
|
||||
fn build_wire_query(id: u16, domain: &str, qtype: QueryType) -> BytePacketBuffer {
|
||||
let mut pkt = DnsPacket::new();
|
||||
pkt.header.id = id;
|
||||
pkt.header.recursion_desired = true;
|
||||
pkt.header.questions = 1;
|
||||
pkt.questions
|
||||
.push(crate::question::DnsQuestion::new(domain.to_string(), qtype));
|
||||
let mut buf = BytePacketBuffer::new();
|
||||
pkt.write(&mut buf).unwrap();
|
||||
BytePacketBuffer::from_bytes(buf.filled())
|
||||
}
|
||||
|
||||
fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> DnsPacket {
|
||||
@@ -402,3 +490,368 @@ fn special_use_response(query: &DnsPacket, qname: &str, qtype: QueryType) -> Dns
|
||||
DnsPacket::response_from(query, ResultCode::NXDOMAIN)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
use std::net::{Ipv4Addr, SocketAddr};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
// ---- InflightGuard unit tests ----
|
||||
|
||||
#[test]
|
||||
fn inflight_guard_removes_key_on_drop() {
|
||||
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
|
||||
let key = ("example.com".to_string(), QueryType::A);
|
||||
let (tx, _) = broadcast::channel::<Option<DnsPacket>>(1);
|
||||
map.lock().unwrap().insert(key.clone(), tx);
|
||||
|
||||
assert_eq!(map.lock().unwrap().len(), 1);
|
||||
{
|
||||
let _guard = InflightGuard {
|
||||
inflight: &map,
|
||||
key: key.clone(),
|
||||
};
|
||||
} // guard dropped here
|
||||
assert!(map.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inflight_guard_only_removes_own_key() {
|
||||
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
|
||||
let key_a = ("a.com".to_string(), QueryType::A);
|
||||
let key_b = ("b.com".to_string(), QueryType::A);
|
||||
let (tx_a, _) = broadcast::channel::<Option<DnsPacket>>(1);
|
||||
let (tx_b, _) = broadcast::channel::<Option<DnsPacket>>(1);
|
||||
map.lock().unwrap().insert(key_a.clone(), tx_a);
|
||||
map.lock().unwrap().insert(key_b.clone(), tx_b);
|
||||
|
||||
{
|
||||
let _guard = InflightGuard {
|
||||
inflight: &map,
|
||||
key: key_a,
|
||||
};
|
||||
}
|
||||
let m = map.lock().unwrap();
|
||||
assert_eq!(m.len(), 1);
|
||||
assert!(m.contains_key(&key_b));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inflight_guard_same_domain_different_qtype_independent() {
|
||||
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
|
||||
let key_a = ("example.com".to_string(), QueryType::A);
|
||||
let key_aaaa = ("example.com".to_string(), QueryType::AAAA);
|
||||
let (tx_a, _) = broadcast::channel::<Option<DnsPacket>>(1);
|
||||
let (tx_aaaa, _) = broadcast::channel::<Option<DnsPacket>>(1);
|
||||
map.lock().unwrap().insert(key_a.clone(), tx_a);
|
||||
map.lock().unwrap().insert(key_aaaa.clone(), tx_aaaa);
|
||||
|
||||
{
|
||||
let _guard = InflightGuard {
|
||||
inflight: &map,
|
||||
key: key_a,
|
||||
};
|
||||
}
|
||||
let m = map.lock().unwrap();
|
||||
assert_eq!(m.len(), 1);
|
||||
assert!(m.contains_key(&key_aaaa));
|
||||
}
|
||||
|
||||
// ---- Coalescing disposition tests (via acquire_inflight) ----
|
||||
|
||||
#[test]
|
||||
fn first_caller_becomes_leader() {
|
||||
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
|
||||
let key = ("test.com".to_string(), QueryType::A);
|
||||
|
||||
let d = acquire_inflight(&map, key.clone());
|
||||
assert!(matches!(d, Disposition::Leader(_)));
|
||||
assert_eq!(map.lock().unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn second_caller_becomes_follower() {
|
||||
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
|
||||
let key = ("test.com".to_string(), QueryType::A);
|
||||
|
||||
let _leader = acquire_inflight(&map, key.clone());
|
||||
let follower = acquire_inflight(&map, key);
|
||||
assert!(matches!(follower, Disposition::Follower(_)));
|
||||
// Map still has exactly 1 entry — follower subscribes, doesn't insert
|
||||
assert_eq!(map.lock().unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn leader_broadcast_reaches_follower() {
|
||||
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
|
||||
let key = ("test.com".to_string(), QueryType::A);
|
||||
|
||||
let leader = acquire_inflight(&map, key.clone());
|
||||
let follower = acquire_inflight(&map, key);
|
||||
|
||||
let tx = match leader {
|
||||
Disposition::Leader(tx) => tx,
|
||||
_ => panic!("expected leader"),
|
||||
};
|
||||
let mut rx = match follower {
|
||||
Disposition::Follower(rx) => rx,
|
||||
_ => panic!("expected follower"),
|
||||
};
|
||||
|
||||
let mut resp = DnsPacket::new();
|
||||
resp.header.id = 42;
|
||||
resp.answers.push(DnsRecord::A {
|
||||
domain: "test.com".into(),
|
||||
addr: Ipv4Addr::new(1, 2, 3, 4),
|
||||
ttl: 300,
|
||||
});
|
||||
let _ = tx.send(Some(resp));
|
||||
|
||||
let received = rx.recv().await.unwrap().unwrap();
|
||||
assert_eq!(received.header.id, 42);
|
||||
assert_eq!(received.answers.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn leader_none_signals_failure_to_follower() {
|
||||
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
|
||||
let key = ("test.com".to_string(), QueryType::A);
|
||||
|
||||
let leader = acquire_inflight(&map, key.clone());
|
||||
let follower = acquire_inflight(&map, key);
|
||||
|
||||
let tx = match leader {
|
||||
Disposition::Leader(tx) => tx,
|
||||
_ => panic!("expected leader"),
|
||||
};
|
||||
let mut rx = match follower {
|
||||
Disposition::Follower(rx) => rx,
|
||||
_ => panic!("expected follower"),
|
||||
};
|
||||
|
||||
let _ = tx.send(None);
|
||||
assert!(rx.recv().await.unwrap().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiple_followers_all_receive_via_acquire() {
|
||||
let map: Mutex<InflightMap> = Mutex::new(HashMap::new());
|
||||
let key = ("multi.com".to_string(), QueryType::A);
|
||||
|
||||
let leader = acquire_inflight(&map, key.clone());
|
||||
let f1 = acquire_inflight(&map, key.clone());
|
||||
let f2 = acquire_inflight(&map, key.clone());
|
||||
let f3 = acquire_inflight(&map, key);
|
||||
|
||||
let tx = match leader {
|
||||
Disposition::Leader(tx) => tx,
|
||||
_ => panic!("expected leader"),
|
||||
};
|
||||
|
||||
let mut resp = DnsPacket::new();
|
||||
resp.answers.push(DnsRecord::A {
|
||||
domain: "multi.com".into(),
|
||||
addr: Ipv4Addr::new(10, 0, 0, 1),
|
||||
ttl: 60,
|
||||
});
|
||||
let _ = tx.send(Some(resp));
|
||||
|
||||
for f in [f1, f2, f3] {
|
||||
let mut rx = match f {
|
||||
Disposition::Follower(rx) => rx,
|
||||
_ => panic!("expected follower"),
|
||||
};
|
||||
let r = rx.recv().await.unwrap().unwrap();
|
||||
assert_eq!(r.answers.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Integration: concurrent handle_query coalescing ----
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// Spawn a slow TCP DNS server that delays `delay` before responding.
|
||||
/// Returns (addr, query_count) where query_count is an Arc<AtomicU32>
|
||||
/// tracking how many queries were actually resolved (not coalesced).
|
||||
async fn spawn_slow_dns_server(
|
||||
delay: Duration,
|
||||
) -> (SocketAddr, Arc<std::sync::atomic::AtomicU32>) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
|
||||
let count_clone = count.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (mut stream, _) = match listener.accept().await {
|
||||
Ok(c) => c,
|
||||
Err(_) => break,
|
||||
};
|
||||
let count = count_clone.clone();
|
||||
let delay = delay;
|
||||
tokio::spawn(async move {
|
||||
let mut len_buf = [0u8; 2];
|
||||
if stream.read_exact(&mut len_buf).await.is_err() {
|
||||
return;
|
||||
}
|
||||
let len = u16::from_be_bytes(len_buf) as usize;
|
||||
let mut data = vec![0u8; len];
|
||||
if stream.read_exact(&mut data).await.is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut buf = BytePacketBuffer::from_bytes(&data);
|
||||
let query = match DnsPacket::from_buffer(&mut buf) {
|
||||
Ok(q) => q,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
// Deliberate delay to create coalescing window
|
||||
tokio::time::sleep(delay).await;
|
||||
|
||||
let mut resp = DnsPacket::response_from(&query, ResultCode::NOERROR);
|
||||
resp.header.authoritative_answer = true;
|
||||
if let Some(q) = query.questions.first() {
|
||||
resp.answers.push(DnsRecord::A {
|
||||
domain: q.name.clone(),
|
||||
addr: Ipv4Addr::new(10, 0, 0, 1),
|
||||
ttl: 300,
|
||||
});
|
||||
}
|
||||
|
||||
let mut resp_buf = BytePacketBuffer::new();
|
||||
if resp.write(&mut resp_buf).is_err() {
|
||||
return;
|
||||
}
|
||||
let resp_bytes = resp_buf.filled();
|
||||
let mut out = Vec::with_capacity(2 + resp_bytes.len());
|
||||
out.extend_from_slice(&(resp_bytes.len() as u16).to_be_bytes());
|
||||
out.extend_from_slice(resp_bytes);
|
||||
let _ = stream.write_all(&out).await;
|
||||
});
|
||||
}
|
||||
});
|
||||
(addr, count)
|
||||
}
|
||||
|
||||
async fn test_recursive_ctx(root_hint: SocketAddr) -> Arc<ServerCtx> {
|
||||
let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
|
||||
Arc::new(ServerCtx {
|
||||
socket,
|
||||
zone_map: HashMap::new(),
|
||||
cache: RwLock::new(crate::cache::DnsCache::new(100, 60, 86400)),
|
||||
stats: Mutex::new(crate::stats::ServerStats::new()),
|
||||
overrides: RwLock::new(crate::override_store::OverrideStore::new()),
|
||||
blocklist: RwLock::new(crate::blocklist::BlocklistStore::new()),
|
||||
query_log: Mutex::new(crate::query_log::QueryLog::new(100)),
|
||||
services: Mutex::new(crate::service_store::ServiceStore::new()),
|
||||
lan_peers: Mutex::new(crate::lan::PeerStore::new(90)),
|
||||
forwarding_rules: Vec::new(),
|
||||
upstream: Mutex::new(crate::forward::Upstream::Udp(
|
||||
"127.0.0.1:53".parse().unwrap(),
|
||||
)),
|
||||
upstream_auto: false,
|
||||
upstream_port: 53,
|
||||
lan_ip: Mutex::new(Ipv4Addr::LOCALHOST),
|
||||
timeout: Duration::from_secs(3),
|
||||
proxy_tld: "numa".to_string(),
|
||||
proxy_tld_suffix: ".numa".to_string(),
|
||||
lan_enabled: false,
|
||||
config_path: "/tmp/test-numa.toml".to_string(),
|
||||
config_found: false,
|
||||
config_dir: std::path::PathBuf::from("/tmp"),
|
||||
data_dir: std::path::PathBuf::from("/tmp"),
|
||||
tls_config: None,
|
||||
upstream_mode: crate::config::UpstreamMode::Recursive,
|
||||
root_hints: vec![root_hint],
|
||||
srtt: RwLock::new(crate::srtt::SrttCache::new(true)),
|
||||
inflight: Mutex::new(HashMap::new()),
|
||||
dnssec_enabled: false,
|
||||
dnssec_strict: false,
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn concurrent_queries_coalesce_to_single_resolution() {
|
||||
// Force TCP-only so mock server works
|
||||
crate::recursive::UDP_DISABLED.store(true, std::sync::atomic::Ordering::Release);
|
||||
|
||||
let (server_addr, query_count) = spawn_slow_dns_server(Duration::from_millis(200)).await;
|
||||
let ctx = test_recursive_ctx(server_addr).await;
|
||||
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap();
|
||||
|
||||
// Fire 5 concurrent queries for the same (domain, A)
|
||||
let mut handles = Vec::new();
|
||||
for i in 0..5u16 {
|
||||
let ctx = ctx.clone();
|
||||
let buf = build_wire_query(100 + i, "coalesce-test.example.com", QueryType::A);
|
||||
handles.push(tokio::spawn(
|
||||
async move { handle_query(buf, src, &ctx).await },
|
||||
));
|
||||
}
|
||||
|
||||
for h in handles {
|
||||
h.await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
// Only 1 resolution should have reached the upstream server
|
||||
let actual = query_count.load(std::sync::atomic::Ordering::Relaxed);
|
||||
assert_eq!(actual, 1, "expected 1 upstream query, got {}", actual);
|
||||
|
||||
// Inflight map must be empty after all queries complete
|
||||
assert!(ctx.inflight.lock().unwrap().is_empty());
|
||||
|
||||
crate::recursive::reset_udp_state();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn different_qtypes_not_coalesced() {
|
||||
crate::recursive::UDP_DISABLED.store(true, std::sync::atomic::Ordering::Release);
|
||||
|
||||
let (server_addr, query_count) = spawn_slow_dns_server(Duration::from_millis(100)).await;
|
||||
let ctx = test_recursive_ctx(server_addr).await;
|
||||
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap();
|
||||
|
||||
// Fire A and AAAA concurrently — should NOT coalesce
|
||||
let ctx_ref = ctx.clone();
|
||||
let ctx_ref2 = ctx.clone();
|
||||
let buf_a = build_wire_query(200, "different-qt.example.com", QueryType::A);
|
||||
let buf_aaaa = build_wire_query(201, "different-qt.example.com", QueryType::AAAA);
|
||||
|
||||
let h1 = tokio::spawn(async move { handle_query(buf_a, src, &ctx_ref).await });
|
||||
let h2 = tokio::spawn(async move { handle_query(buf_aaaa, src, &ctx_ref2).await });
|
||||
|
||||
h1.await.unwrap().unwrap();
|
||||
h2.await.unwrap().unwrap();
|
||||
|
||||
let actual = query_count.load(std::sync::atomic::Ordering::Relaxed);
|
||||
assert!(
|
||||
actual >= 2,
|
||||
"A and AAAA should resolve independently, got {}",
|
||||
actual
|
||||
);
|
||||
assert!(ctx.inflight.lock().unwrap().is_empty());
|
||||
|
||||
crate::recursive::reset_udp_state();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn inflight_map_cleaned_after_upstream_error() {
|
||||
// Server that rejects everything — no server running at all
|
||||
let bogus_addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
|
||||
let ctx = test_recursive_ctx(bogus_addr).await;
|
||||
let src: SocketAddr = "127.0.0.1:9999".parse().unwrap();
|
||||
|
||||
let buf = build_wire_query(300, "will-fail.example.com", QueryType::A);
|
||||
let _ = handle_query(buf, src, &ctx).await;
|
||||
|
||||
// Map must be clean even after error
|
||||
assert!(ctx.inflight.lock().unwrap().is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::cache::{DnsCache, DnssecStatus};
|
||||
use crate::packet::DnsPacket;
|
||||
use crate::question::QueryType;
|
||||
use crate::record::DnsRecord;
|
||||
use crate::srtt::SrttCache;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ValidationStats {
|
||||
@@ -64,6 +65,7 @@ pub async fn validate_response(
|
||||
response: &DnsPacket,
|
||||
cache: &RwLock<DnsCache>,
|
||||
root_hints: &[std::net::SocketAddr],
|
||||
srtt: &RwLock<SrttCache>,
|
||||
) -> (DnssecStatus, ValidationStats) {
|
||||
let start = Instant::now();
|
||||
let stats = Mutex::new(ValidationStats::default());
|
||||
@@ -95,7 +97,7 @@ pub async fn validate_response(
|
||||
}
|
||||
}
|
||||
for zone in &signer_zones {
|
||||
fetch_dnskeys(zone, cache, root_hints, &stats).await;
|
||||
fetch_dnskeys(zone, cache, root_hints, srtt, &stats).await;
|
||||
}
|
||||
|
||||
// Group answer records into RRsets (by domain + type, excluding RRSIGs)
|
||||
@@ -132,7 +134,8 @@ pub async fn validate_response(
|
||||
..
|
||||
} = rrsig
|
||||
{
|
||||
let dnskey_response = fetch_dnskeys(signer_name, cache, root_hints, &stats).await;
|
||||
let dnskey_response =
|
||||
fetch_dnskeys(signer_name, cache, root_hints, srtt, &stats).await;
|
||||
let dnskeys: Vec<&DnsRecord> = dnskey_response
|
||||
.iter()
|
||||
.filter(|r| matches!(r, DnsRecord::DNSKEY { .. }))
|
||||
@@ -206,6 +209,7 @@ pub async fn validate_response(
|
||||
&dnskey_response,
|
||||
cache,
|
||||
root_hints,
|
||||
srtt,
|
||||
trust_anchors,
|
||||
0,
|
||||
&stats,
|
||||
@@ -276,11 +280,13 @@ pub async fn validate_response(
|
||||
|
||||
/// Walk the chain of trust from zone DNSKEY up to root trust anchor.
|
||||
/// `zone_records` contains both DNSKEY and RRSIG records from the DNSKEY response.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn validate_chain<'a>(
|
||||
zone: &'a str,
|
||||
zone_records: &'a [DnsRecord],
|
||||
cache: &'a RwLock<DnsCache>,
|
||||
root_hints: &'a [std::net::SocketAddr],
|
||||
srtt: &'a RwLock<SrttCache>,
|
||||
trust_anchors: &'a [DnsRecord],
|
||||
depth: u8,
|
||||
stats: &'a Mutex<ValidationStats>,
|
||||
@@ -343,7 +349,7 @@ fn validate_chain<'a>(
|
||||
return DnssecStatus::Indeterminate;
|
||||
}
|
||||
let parent = parent_zone(zone);
|
||||
let ds_records = fetch_ds(zone, cache, root_hints, stats).await;
|
||||
let ds_records = fetch_ds(zone, cache, root_hints, srtt, stats).await;
|
||||
|
||||
if ds_records.is_empty() {
|
||||
debug!("dnssec: no DS for zone '{}' at parent '{}'", zone, parent);
|
||||
@@ -377,7 +383,7 @@ fn validate_chain<'a>(
|
||||
|
||||
// Walk up: validate the parent's DNSKEY
|
||||
trace!("dnssec: fetching parent DNSKEY for '{}'", parent);
|
||||
let parent_records = fetch_dnskeys(&parent, cache, root_hints, stats).await;
|
||||
let parent_records = fetch_dnskeys(&parent, cache, root_hints, srtt, stats).await;
|
||||
if parent_records.is_empty() {
|
||||
debug!("dnssec: no parent DNSKEY for '{}' — Indeterminate", parent);
|
||||
return DnssecStatus::Indeterminate;
|
||||
@@ -388,6 +394,7 @@ fn validate_chain<'a>(
|
||||
&parent_records,
|
||||
cache,
|
||||
root_hints,
|
||||
srtt,
|
||||
trust_anchors,
|
||||
depth + 1,
|
||||
stats,
|
||||
@@ -460,6 +467,7 @@ async fn fetch_dnskeys(
|
||||
zone: &str,
|
||||
cache: &RwLock<DnsCache>,
|
||||
root_hints: &[std::net::SocketAddr],
|
||||
srtt: &RwLock<SrttCache>,
|
||||
stats: &Mutex<ValidationStats>,
|
||||
) -> Vec<DnsRecord> {
|
||||
if let Some(pkt) = cache.read().unwrap().lookup(zone, QueryType::DNSKEY) {
|
||||
@@ -475,7 +483,8 @@ async fn fetch_dnskeys(
|
||||
trace!("dnssec: fetch_dnskeys('{}') cache miss — resolving", zone);
|
||||
stats.lock().unwrap().dnskey_fetches += 1;
|
||||
if let Ok(pkt) =
|
||||
crate::recursive::resolve_iterative(zone, QueryType::DNSKEY, cache, root_hints, 0, 0).await
|
||||
crate::recursive::resolve_iterative(zone, QueryType::DNSKEY, cache, root_hints, srtt, 0, 0)
|
||||
.await
|
||||
{
|
||||
cache.write().unwrap().insert(zone, QueryType::DNSKEY, &pkt);
|
||||
return pkt.answers;
|
||||
@@ -488,6 +497,7 @@ async fn fetch_ds(
|
||||
child: &str,
|
||||
cache: &RwLock<DnsCache>,
|
||||
root_hints: &[std::net::SocketAddr],
|
||||
srtt: &RwLock<SrttCache>,
|
||||
stats: &Mutex<ValidationStats>,
|
||||
) -> Vec<DnsRecord> {
|
||||
if let Some(pkt) = cache.read().unwrap().lookup(child, QueryType::DS) {
|
||||
@@ -501,7 +511,8 @@ async fn fetch_ds(
|
||||
|
||||
stats.lock().unwrap().ds_fetches += 1;
|
||||
if let Ok(pkt) =
|
||||
crate::recursive::resolve_iterative(child, QueryType::DS, cache, root_hints, 0, 0).await
|
||||
crate::recursive::resolve_iterative(child, QueryType::DS, cache, root_hints, srtt, 0, 0)
|
||||
.await
|
||||
{
|
||||
cache.write().unwrap().insert(child, QueryType::DS, &pkt);
|
||||
return pkt
|
||||
|
||||
@@ -16,6 +16,7 @@ pub mod question;
|
||||
pub mod record;
|
||||
pub mod recursive;
|
||||
pub mod service_store;
|
||||
pub mod srtt;
|
||||
pub mod stats;
|
||||
pub mod system_dns;
|
||||
pub mod tls;
|
||||
|
||||
@@ -201,6 +201,8 @@ async fn main() -> numa::Result<()> {
|
||||
tls_config: initial_tls,
|
||||
upstream_mode: config.upstream.mode,
|
||||
root_hints: numa::recursive::parse_root_hints(&config.upstream.root_hints),
|
||||
srtt: std::sync::RwLock::new(numa::srtt::SrttCache::new(config.upstream.srtt)),
|
||||
inflight: std::sync::Mutex::new(std::collections::HashMap::new()),
|
||||
dnssec_enabled: config.dnssec.enabled,
|
||||
dnssec_strict: config.dnssec.strict,
|
||||
});
|
||||
@@ -353,7 +355,12 @@ async fn main() -> numa::Result<()> {
|
||||
let prime_ctx = Arc::clone(&ctx);
|
||||
let prime_tlds = config.upstream.prime_tlds;
|
||||
tokio::spawn(async move {
|
||||
numa::recursive::prime_tld_cache(&prime_ctx.cache, &prime_ctx.root_hints, &prime_tlds)
|
||||
numa::recursive::prime_tld_cache(
|
||||
&prime_ctx.cache,
|
||||
&prime_ctx.root_hints,
|
||||
&prime_tlds,
|
||||
&prime_ctx.srtt,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
105
src/recursive.rs
105
src/recursive.rs
@@ -1,7 +1,7 @@
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
use std::sync::RwLock;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use log::{debug, info};
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::header::ResultCode;
|
||||
use crate::packet::DnsPacket;
|
||||
use crate::question::{DnsQuestion, QueryType};
|
||||
use crate::record::DnsRecord;
|
||||
use crate::srtt::SrttCache;
|
||||
|
||||
const MAX_REFERRAL_DEPTH: u8 = 10;
|
||||
const MAX_CNAME_DEPTH: u8 = 8;
|
||||
@@ -20,7 +21,8 @@ const UDP_FAIL_THRESHOLD: u8 = 3;
|
||||
|
||||
static QUERY_ID: AtomicU16 = AtomicU16::new(1);
|
||||
static UDP_FAILURES: std::sync::atomic::AtomicU8 = std::sync::atomic::AtomicU8::new(0);
|
||||
static UDP_DISABLED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
|
||||
pub(crate) static UDP_DISABLED: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(false);
|
||||
|
||||
fn next_id() -> u16 {
|
||||
QUERY_ID.fetch_add(1, Ordering::Relaxed)
|
||||
@@ -58,7 +60,12 @@ pub async fn probe_udp(root_hints: &[SocketAddr]) {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn prime_tld_cache(cache: &RwLock<DnsCache>, root_hints: &[SocketAddr], tlds: &[String]) {
|
||||
pub async fn prime_tld_cache(
|
||||
cache: &RwLock<DnsCache>,
|
||||
root_hints: &[SocketAddr],
|
||||
tlds: &[String],
|
||||
srtt: &RwLock<SrttCache>,
|
||||
) {
|
||||
if root_hints.is_empty() || tlds.is_empty() {
|
||||
return;
|
||||
}
|
||||
@@ -66,7 +73,7 @@ pub async fn prime_tld_cache(cache: &RwLock<DnsCache>, root_hints: &[SocketAddr]
|
||||
let mut root_addr = root_hints[0];
|
||||
for hint in root_hints {
|
||||
info!("prime: probing root {}", hint);
|
||||
match send_query(".", QueryType::NS, *hint).await {
|
||||
match send_query(".", QueryType::NS, *hint, srtt).await {
|
||||
Ok(_) => {
|
||||
info!("prime: root {} reachable", hint);
|
||||
root_addr = *hint;
|
||||
@@ -79,7 +86,7 @@ pub async fn prime_tld_cache(cache: &RwLock<DnsCache>, root_hints: &[SocketAddr]
|
||||
}
|
||||
|
||||
// Fetch root DNSKEY (needed for DNSSEC chain-of-trust terminus)
|
||||
if let Ok(root_dnskey) = send_query(".", QueryType::DNSKEY, root_addr).await {
|
||||
if let Ok(root_dnskey) = send_query(".", QueryType::DNSKEY, root_addr, srtt).await {
|
||||
cache
|
||||
.write()
|
||||
.unwrap()
|
||||
@@ -91,7 +98,7 @@ pub async fn prime_tld_cache(cache: &RwLock<DnsCache>, root_hints: &[SocketAddr]
|
||||
|
||||
for tld in tlds {
|
||||
// Fetch NS referral (includes DS in authority section from root)
|
||||
let response = match send_query(tld, QueryType::NS, root_addr).await {
|
||||
let response = match send_query(tld, QueryType::NS, root_addr, srtt).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
debug!("prime: failed to query NS for .{}: {}", tld, e);
|
||||
@@ -108,7 +115,6 @@ pub async fn prime_tld_cache(cache: &RwLock<DnsCache>, root_hints: &[SocketAddr]
|
||||
let mut cache_w = cache.write().unwrap();
|
||||
cache_w.insert(tld, QueryType::NS, &response);
|
||||
cache_glue(&mut cache_w, &response, &ns_names);
|
||||
// Cache DS records from referral authority section
|
||||
cache_ds_from_authority(&mut cache_w, &response);
|
||||
}
|
||||
|
||||
@@ -116,7 +122,7 @@ pub async fn prime_tld_cache(cache: &RwLock<DnsCache>, root_hints: &[SocketAddr]
|
||||
let first_ns_name = ns_names.first().map(|s| s.as_str()).unwrap_or("");
|
||||
let first_ns = glue_addrs_for(&response, first_ns_name);
|
||||
if let Some(ns_addr) = first_ns.first() {
|
||||
if let Ok(dnskey_resp) = send_query(tld, QueryType::DNSKEY, *ns_addr).await {
|
||||
if let Ok(dnskey_resp) = send_query(tld, QueryType::DNSKEY, *ns_addr, srtt).await {
|
||||
cache
|
||||
.write()
|
||||
.unwrap()
|
||||
@@ -140,10 +146,11 @@ pub async fn resolve_recursive(
|
||||
cache: &RwLock<DnsCache>,
|
||||
original_query: &DnsPacket,
|
||||
root_hints: &[SocketAddr],
|
||||
srtt: &RwLock<SrttCache>,
|
||||
) -> crate::Result<DnsPacket> {
|
||||
// No overall timeout — each hop is bounded by NS_QUERY_TIMEOUT (UDP + TCP fallback),
|
||||
// and MAX_REFERRAL_DEPTH caps the chain length.
|
||||
let mut resp = resolve_iterative(qname, qtype, cache, root_hints, 0, 0).await?;
|
||||
let mut resp = resolve_iterative(qname, qtype, cache, root_hints, srtt, 0, 0).await?;
|
||||
|
||||
resp.header.id = original_query.header.id;
|
||||
resp.header.recursion_available = true;
|
||||
@@ -157,6 +164,7 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
qtype: QueryType,
|
||||
cache: &'a RwLock<DnsCache>,
|
||||
root_hints: &'a [SocketAddr],
|
||||
srtt: &'a RwLock<SrttCache>,
|
||||
referral_depth: u8,
|
||||
cname_depth: u8,
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<DnsPacket>> + Send + 'a>> {
|
||||
@@ -170,6 +178,7 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
}
|
||||
|
||||
let (mut current_zone, mut ns_addrs) = find_closest_ns(qname, cache, root_hints);
|
||||
srtt.read().unwrap().sort_by_rtt(&mut ns_addrs);
|
||||
let mut ns_idx = 0;
|
||||
|
||||
for _ in 0..MAX_REFERRAL_DEPTH {
|
||||
@@ -185,7 +194,7 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
ns_addr, q_type, q_name, current_zone, referral_depth
|
||||
);
|
||||
|
||||
let response = match send_query(q_name, q_type, ns_addr).await {
|
||||
let response = match send_query(q_name, q_type, ns_addr, srtt).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
debug!("recursive: NS {} failed: {}", ns_addr, e);
|
||||
@@ -194,7 +203,6 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
}
|
||||
};
|
||||
|
||||
// Minimized query response — treat as referral, not final answer
|
||||
if (q_type != qtype || !q_name.eq_ignore_ascii_case(qname))
|
||||
&& (!response.authorities.is_empty() || !response.answers.is_empty())
|
||||
{
|
||||
@@ -205,8 +213,9 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
if all_ns.is_empty() {
|
||||
all_ns = extract_ns_names(&response);
|
||||
}
|
||||
let new_addrs = resolve_ns_addrs_from_glue(&response, &all_ns, cache);
|
||||
let mut new_addrs = resolve_ns_addrs_from_glue(&response, &all_ns, cache);
|
||||
if !new_addrs.is_empty() {
|
||||
srtt.read().unwrap().sort_by_rtt(&mut new_addrs);
|
||||
ns_addrs = new_addrs;
|
||||
ns_idx = 0;
|
||||
continue;
|
||||
@@ -233,6 +242,7 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
qtype,
|
||||
cache,
|
||||
root_hints,
|
||||
srtt,
|
||||
0,
|
||||
cname_depth + 1,
|
||||
)
|
||||
@@ -256,8 +266,6 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
// Referral — extract NS + glue, cache glue, resolve NS addresses
|
||||
// Update zone for query minimization
|
||||
if let Some(zone) = referral_zone(&response) {
|
||||
current_zone = zone;
|
||||
}
|
||||
@@ -276,13 +284,13 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
for ns_name in &ns_names {
|
||||
if referral_depth < MAX_REFERRAL_DEPTH {
|
||||
debug!("recursive: resolving glue-less NS {}", ns_name);
|
||||
// Try A first, then AAAA
|
||||
for qt in [QueryType::A, QueryType::AAAA] {
|
||||
if let Ok(ns_resp) = resolve_iterative(
|
||||
ns_name,
|
||||
qt,
|
||||
cache,
|
||||
root_hints,
|
||||
srtt,
|
||||
referral_depth + 1,
|
||||
cname_depth,
|
||||
)
|
||||
@@ -316,6 +324,7 @@ pub(crate) fn resolve_iterative<'a>(
|
||||
return Err(format!("could not resolve any NS for {}", qname).into());
|
||||
}
|
||||
|
||||
srtt.read().unwrap().sort_by_rtt(&mut new_ns_addrs);
|
||||
ns_addrs = new_ns_addrs;
|
||||
ns_idx = 0;
|
||||
}
|
||||
@@ -561,7 +570,32 @@ fn make_glue_packet() -> DnsPacket {
|
||||
pkt
|
||||
}
|
||||
|
||||
async fn send_query(qname: &str, qtype: QueryType, server: SocketAddr) -> crate::Result<DnsPacket> {
|
||||
async fn tcp_with_srtt(
|
||||
query: &DnsPacket,
|
||||
server: SocketAddr,
|
||||
srtt: &RwLock<SrttCache>,
|
||||
start: Instant,
|
||||
) -> crate::Result<DnsPacket> {
|
||||
match crate::forward::forward_tcp(query, server, TCP_TIMEOUT).await {
|
||||
Ok(resp) => {
|
||||
srtt.write()
|
||||
.unwrap()
|
||||
.record_rtt(server.ip(), start.elapsed().as_millis() as u64, true);
|
||||
Ok(resp)
|
||||
}
|
||||
Err(e) => {
|
||||
srtt.write().unwrap().record_failure(server.ip());
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_query(
|
||||
qname: &str,
|
||||
qtype: QueryType,
|
||||
server: SocketAddr,
|
||||
srtt: &RwLock<SrttCache>,
|
||||
) -> crate::Result<DnsPacket> {
|
||||
let mut query = DnsPacket::new();
|
||||
query.header.id = next_id();
|
||||
query.header.recursion_desired = false;
|
||||
@@ -573,24 +607,30 @@ async fn send_query(qname: &str, qtype: QueryType, server: SocketAddr) -> crate:
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
// Skip IPv6 if the socket can't handle it (bound to 0.0.0.0)
|
||||
let start = Instant::now();
|
||||
|
||||
// IPv6 forced to TCP — our UDP socket is bound to 0.0.0.0
|
||||
if server.is_ipv6() {
|
||||
return crate::forward::forward_tcp(&query, server, TCP_TIMEOUT).await;
|
||||
return tcp_with_srtt(&query, server, srtt, start).await;
|
||||
}
|
||||
|
||||
// If UDP has been detected as blocked, go TCP-first
|
||||
// UDP detected as blocked — go TCP-first
|
||||
if UDP_DISABLED.load(Ordering::Acquire) {
|
||||
return crate::forward::forward_tcp(&query, server, TCP_TIMEOUT).await;
|
||||
return tcp_with_srtt(&query, server, srtt, start).await;
|
||||
}
|
||||
|
||||
match forward_udp(&query, server, NS_QUERY_TIMEOUT).await {
|
||||
Ok(resp) if resp.header.truncated_message => {
|
||||
debug!("send_query: truncated from {}, retrying TCP", server);
|
||||
crate::forward::forward_tcp(&query, server, TCP_TIMEOUT).await
|
||||
tcp_with_srtt(&query, server, srtt, start).await
|
||||
}
|
||||
Ok(resp) => {
|
||||
// UDP works — reset failure counter
|
||||
UDP_FAILURES.store(0, Ordering::Release);
|
||||
srtt.write().unwrap().record_rtt(
|
||||
server.ip(),
|
||||
start.elapsed().as_millis() as u64,
|
||||
false,
|
||||
);
|
||||
Ok(resp)
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -603,7 +643,7 @@ async fn send_query(qname: &str, qtype: QueryType, server: SocketAddr) -> crate:
|
||||
);
|
||||
}
|
||||
debug!("send_query: UDP failed for {}: {}, trying TCP", server, e);
|
||||
crate::forward::forward_tcp(&query, server, TCP_TIMEOUT).await
|
||||
tcp_with_srtt(&query, server, srtt, start).await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -894,7 +934,8 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
|
||||
let result = send_query("test.example.com", QueryType::A, server_addr).await;
|
||||
let srtt = RwLock::new(SrttCache::new(true));
|
||||
let result = send_query("test.example.com", QueryType::A, server_addr, &srtt).await;
|
||||
|
||||
let resp = result.expect("should resolve via TCP fallback");
|
||||
assert_eq!(resp.header.rescode, ResultCode::NOERROR);
|
||||
@@ -945,7 +986,8 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
|
||||
let result = send_query("hello.example.com", QueryType::A, server_addr).await;
|
||||
let srtt = RwLock::new(SrttCache::new(true));
|
||||
let result = send_query("hello.example.com", QueryType::A, server_addr, &srtt).await;
|
||||
let resp = result.expect("TCP-only send_query should work");
|
||||
assert_eq!(resp.header.rescode, ResultCode::NOERROR);
|
||||
match &resp.answers[0] {
|
||||
@@ -967,10 +1009,19 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let cache = RwLock::new(DnsCache::new(100, 60, 86400));
|
||||
let srtt = RwLock::new(SrttCache::new(true));
|
||||
let root_hints = vec![server_addr];
|
||||
|
||||
let result =
|
||||
resolve_iterative("nonexistent.test", QueryType::A, &cache, &root_hints, 0, 0).await;
|
||||
let result = resolve_iterative(
|
||||
"nonexistent.test",
|
||||
QueryType::A,
|
||||
&cache,
|
||||
&root_hints,
|
||||
&srtt,
|
||||
0,
|
||||
0,
|
||||
)
|
||||
.await;
|
||||
|
||||
let resp = result.expect("NXDOMAIN should still return a response");
|
||||
assert_eq!(resp.header.rescode, ResultCode::NXDOMAIN);
|
||||
|
||||
327
src/srtt.rs
Normal file
327
src/srtt.rs
Normal file
@@ -0,0 +1,327 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::time::Instant;
|
||||
|
||||
const INITIAL_SRTT_MS: u64 = 200;
|
||||
const FAILURE_PENALTY_MS: u64 = 5000;
|
||||
const TCP_PENALTY_MS: u64 = 100;
|
||||
const DECAY_AFTER_SECS: u64 = 300;
|
||||
const MAX_ENTRIES: usize = 4096;
|
||||
const EVICT_BATCH: usize = 64;
|
||||
|
||||
struct SrttEntry {
|
||||
srtt_ms: u64,
|
||||
updated_at: Instant,
|
||||
}
|
||||
|
||||
pub struct SrttCache {
|
||||
entries: HashMap<IpAddr, SrttEntry>,
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
impl Default for SrttCache {
|
||||
fn default() -> Self {
|
||||
Self::new(true)
|
||||
}
|
||||
}
|
||||
|
||||
impl SrttCache {
|
||||
pub fn new(enabled: bool) -> Self {
|
||||
Self {
|
||||
entries: HashMap::new(),
|
||||
enabled,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
/// Get current SRTT for an IP, applying decay if stale. Returns INITIAL for unknown.
|
||||
pub fn get(&self, ip: IpAddr) -> u64 {
|
||||
match self.entries.get(&ip) {
|
||||
Some(entry) => Self::decayed_srtt(entry),
|
||||
None => INITIAL_SRTT_MS,
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply time-based decay: each DECAY_AFTER_SECS period halves distance to INITIAL.
|
||||
fn decayed_srtt(entry: &SrttEntry) -> u64 {
|
||||
let age_secs = entry.updated_at.elapsed().as_secs();
|
||||
if age_secs > DECAY_AFTER_SECS {
|
||||
let periods = (age_secs / DECAY_AFTER_SECS).min(8);
|
||||
let mut srtt = entry.srtt_ms;
|
||||
for _ in 0..periods {
|
||||
srtt = (srtt + INITIAL_SRTT_MS) / 2;
|
||||
}
|
||||
srtt
|
||||
} else {
|
||||
entry.srtt_ms
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a successful query RTT. No-op when disabled.
|
||||
pub fn record_rtt(&mut self, ip: IpAddr, rtt_ms: u64, tcp: bool) {
|
||||
if !self.enabled {
|
||||
return;
|
||||
}
|
||||
let effective = if tcp { rtt_ms + TCP_PENALTY_MS } else { rtt_ms };
|
||||
self.maybe_evict();
|
||||
let entry = self.entries.entry(ip).or_insert(SrttEntry {
|
||||
srtt_ms: effective,
|
||||
updated_at: Instant::now(),
|
||||
});
|
||||
// Apply decay before EWMA so recovered servers aren't stuck at stale penalties
|
||||
let base = Self::decayed_srtt(entry);
|
||||
// BIND EWMA: new = (old * 7 + sample) / 8
|
||||
entry.srtt_ms = (base * 7 + effective) / 8;
|
||||
entry.updated_at = Instant::now();
|
||||
}
|
||||
|
||||
/// Record a failure (timeout or error). No-op when disabled.
|
||||
pub fn record_failure(&mut self, ip: IpAddr) {
|
||||
if !self.enabled {
|
||||
return;
|
||||
}
|
||||
self.maybe_evict();
|
||||
let entry = self.entries.entry(ip).or_insert(SrttEntry {
|
||||
srtt_ms: FAILURE_PENALTY_MS,
|
||||
updated_at: Instant::now(),
|
||||
});
|
||||
entry.srtt_ms = FAILURE_PENALTY_MS;
|
||||
entry.updated_at = Instant::now();
|
||||
}
|
||||
|
||||
/// Sort addresses by SRTT ascending (lowest/fastest first). No-op when disabled.
|
||||
pub fn sort_by_rtt(&self, addrs: &mut [SocketAddr]) {
|
||||
if !self.enabled {
|
||||
return;
|
||||
}
|
||||
addrs.sort_by_key(|a| self.get(a.ip()));
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.entries.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.entries.is_empty()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn set_updated_at(&mut self, ip: IpAddr, at: Instant) {
|
||||
if let Some(entry) = self.entries.get_mut(&ip) {
|
||||
entry.updated_at = at;
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_evict(&mut self) {
|
||||
if self.entries.len() < MAX_ENTRIES {
|
||||
return;
|
||||
}
|
||||
// Batch eviction: remove the oldest EVICT_BATCH entries at once
|
||||
let mut by_age: Vec<IpAddr> = self.entries.keys().copied().collect();
|
||||
by_age.sort_by_key(|ip| self.entries[ip].updated_at);
|
||||
for ip in by_age.into_iter().take(EVICT_BATCH) {
|
||||
self.entries.remove(&ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::Ipv4Addr;
|
||||
|
||||
fn ip(last: u8) -> IpAddr {
|
||||
IpAddr::V4(Ipv4Addr::new(192, 0, 2, last))
|
||||
}
|
||||
|
||||
fn sock(last: u8) -> SocketAddr {
|
||||
SocketAddr::new(ip(last), 53)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_returns_initial() {
|
||||
let cache = SrttCache::new(true);
|
||||
assert_eq!(cache.get(ip(1)), INITIAL_SRTT_MS);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ewma_converges() {
|
||||
let mut cache = SrttCache::new(true);
|
||||
for _ in 0..20 {
|
||||
cache.record_rtt(ip(1), 100, false);
|
||||
}
|
||||
let srtt = cache.get(ip(1));
|
||||
assert!(srtt >= 98 && srtt <= 102, "srtt={}", srtt);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn failure_sets_penalty() {
|
||||
let mut cache = SrttCache::new(true);
|
||||
cache.record_rtt(ip(1), 50, false);
|
||||
cache.record_failure(ip(1));
|
||||
assert_eq!(cache.get(ip(1)), FAILURE_PENALTY_MS);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tcp_penalty_added() {
|
||||
let mut cache = SrttCache::new(true);
|
||||
for _ in 0..20 {
|
||||
cache.record_rtt(ip(1), 50, true);
|
||||
}
|
||||
let srtt = cache.get(ip(1));
|
||||
assert!(srtt >= 148 && srtt <= 152, "srtt={}", srtt);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sort_by_rtt_orders_correctly() {
|
||||
let mut cache = SrttCache::new(true);
|
||||
for _ in 0..20 {
|
||||
cache.record_rtt(ip(1), 500, false);
|
||||
cache.record_rtt(ip(2), 100, false);
|
||||
cache.record_rtt(ip(3), 10, false);
|
||||
}
|
||||
let mut addrs = vec![sock(1), sock(2), sock(3)];
|
||||
cache.sort_by_rtt(&mut addrs);
|
||||
assert_eq!(addrs, vec![sock(3), sock(2), sock(1)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_servers_sort_equal() {
|
||||
let cache = SrttCache::new(true);
|
||||
let mut addrs = vec![sock(1), sock(2), sock(3)];
|
||||
let original = addrs.clone();
|
||||
cache.sort_by_rtt(&mut addrs);
|
||||
assert_eq!(addrs, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disabled_is_noop() {
|
||||
let mut cache = SrttCache::new(false);
|
||||
cache.record_rtt(ip(1), 50, false);
|
||||
cache.record_failure(ip(2));
|
||||
assert_eq!(cache.len(), 0);
|
||||
|
||||
let mut addrs = vec![sock(2), sock(1)];
|
||||
let original = addrs.clone();
|
||||
cache.sort_by_rtt(&mut addrs);
|
||||
assert_eq!(addrs, original);
|
||||
}
|
||||
|
||||
fn age(secs: u64) -> Instant {
|
||||
Instant::now() - std::time::Duration::from_secs(secs)
|
||||
}
|
||||
|
||||
/// Cache with ip(1) saturated at FAILURE_PENALTY_MS
|
||||
fn saturated_penalty_cache() -> SrttCache {
|
||||
let mut cache = SrttCache::new(true);
|
||||
for _ in 0..30 {
|
||||
cache.record_rtt(ip(1), FAILURE_PENALTY_MS, false);
|
||||
}
|
||||
cache
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_decay_within_threshold() {
|
||||
let mut cache = SrttCache::new(true);
|
||||
cache.record_rtt(ip(1), 5000, false);
|
||||
cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS));
|
||||
assert_eq!(cache.get(ip(1)), cache.entries[&ip(1)].srtt_ms);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn one_decay_period() {
|
||||
let mut cache = saturated_penalty_cache();
|
||||
let raw = cache.entries[&ip(1)].srtt_ms;
|
||||
cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS + 1));
|
||||
let expected = (raw + INITIAL_SRTT_MS) / 2;
|
||||
assert_eq!(cache.get(ip(1)), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_decay_periods() {
|
||||
let mut cache = saturated_penalty_cache();
|
||||
let raw = cache.entries[&ip(1)].srtt_ms;
|
||||
cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 4 + 1));
|
||||
let mut expected = raw;
|
||||
for _ in 0..4 {
|
||||
expected = (expected + INITIAL_SRTT_MS) / 2;
|
||||
}
|
||||
assert_eq!(cache.get(ip(1)), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decay_caps_at_8_periods() {
|
||||
// 9 periods and 100 periods should produce the same result (capped at 8)
|
||||
let mut cache_a = saturated_penalty_cache();
|
||||
let mut cache_b = saturated_penalty_cache();
|
||||
cache_a.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 9 + 1));
|
||||
cache_b.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 100));
|
||||
assert_eq!(cache_a.get(ip(1)), cache_b.get(ip(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decay_converges_toward_initial() {
|
||||
let mut cache = saturated_penalty_cache();
|
||||
cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 100));
|
||||
let decayed = cache.get(ip(1));
|
||||
let diff = decayed.abs_diff(INITIAL_SRTT_MS);
|
||||
assert!(
|
||||
diff < 25,
|
||||
"expected near INITIAL_SRTT_MS, got {} (diff={})",
|
||||
decayed,
|
||||
diff
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn record_rtt_applies_decay_before_ewma() {
|
||||
let mut cache = saturated_penalty_cache();
|
||||
cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 8));
|
||||
cache.record_rtt(ip(1), 50, false);
|
||||
let srtt = cache.get(ip(1));
|
||||
// Without decay-before-EWMA, result would be ~(5000*7+50)/8 ≈ 4381
|
||||
assert!(srtt < 500, "expected decay before EWMA, got srtt={}", srtt);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decay_reranks_stale_failures() {
|
||||
let mut cache = saturated_penalty_cache();
|
||||
for _ in 0..30 {
|
||||
cache.record_rtt(ip(2), 300, false);
|
||||
}
|
||||
let mut addrs = vec![sock(1), sock(2)];
|
||||
cache.sort_by_rtt(&mut addrs);
|
||||
assert_eq!(addrs, vec![sock(2), sock(1)]);
|
||||
|
||||
// Age server 1 so it decays toward INITIAL (200ms) — below server 2's 300ms
|
||||
cache.set_updated_at(ip(1), age(DECAY_AFTER_SECS * 100));
|
||||
let mut addrs = vec![sock(1), sock(2)];
|
||||
cache.sort_by_rtt(&mut addrs);
|
||||
assert_eq!(addrs, vec![sock(1), sock(2)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn eviction_removes_oldest() {
|
||||
let mut cache = SrttCache::new(true);
|
||||
for i in 0..MAX_ENTRIES {
|
||||
let octets = [
|
||||
10,
|
||||
((i >> 16) & 0xFF) as u8,
|
||||
((i >> 8) & 0xFF) as u8,
|
||||
(i & 0xFF) as u8,
|
||||
];
|
||||
cache.record_rtt(
|
||||
IpAddr::V4(Ipv4Addr::new(octets[0], octets[1], octets[2], octets[3])),
|
||||
100,
|
||||
false,
|
||||
);
|
||||
}
|
||||
assert_eq!(cache.len(), MAX_ENTRIES);
|
||||
cache.record_rtt(ip(1), 100, false);
|
||||
// Batch eviction removes EVICT_BATCH entries
|
||||
assert!(cache.len() <= MAX_ENTRIES - EVICT_BATCH + 1);
|
||||
}
|
||||
}
|
||||
12
src/stats.rs
12
src/stats.rs
@@ -4,6 +4,7 @@ pub struct ServerStats {
|
||||
queries_total: u64,
|
||||
queries_forwarded: u64,
|
||||
queries_recursive: u64,
|
||||
queries_coalesced: u64,
|
||||
queries_cached: u64,
|
||||
queries_blocked: u64,
|
||||
queries_local: u64,
|
||||
@@ -18,6 +19,7 @@ pub enum QueryPath {
|
||||
Cached,
|
||||
Forwarded,
|
||||
Recursive,
|
||||
Coalesced,
|
||||
Blocked,
|
||||
Overridden,
|
||||
UpstreamError,
|
||||
@@ -30,6 +32,7 @@ impl QueryPath {
|
||||
QueryPath::Cached => "CACHED",
|
||||
QueryPath::Forwarded => "FORWARD",
|
||||
QueryPath::Recursive => "RECURSIVE",
|
||||
QueryPath::Coalesced => "COALESCED",
|
||||
QueryPath::Blocked => "BLOCKED",
|
||||
QueryPath::Overridden => "OVERRIDE",
|
||||
QueryPath::UpstreamError => "SERVFAIL",
|
||||
@@ -45,6 +48,8 @@ impl QueryPath {
|
||||
Some(QueryPath::Forwarded)
|
||||
} else if s.eq_ignore_ascii_case("RECURSIVE") {
|
||||
Some(QueryPath::Recursive)
|
||||
} else if s.eq_ignore_ascii_case("COALESCED") {
|
||||
Some(QueryPath::Coalesced)
|
||||
} else if s.eq_ignore_ascii_case("BLOCKED") {
|
||||
Some(QueryPath::Blocked)
|
||||
} else if s.eq_ignore_ascii_case("OVERRIDE") {
|
||||
@@ -69,6 +74,7 @@ impl ServerStats {
|
||||
queries_total: 0,
|
||||
queries_forwarded: 0,
|
||||
queries_recursive: 0,
|
||||
queries_coalesced: 0,
|
||||
queries_cached: 0,
|
||||
queries_blocked: 0,
|
||||
queries_local: 0,
|
||||
@@ -85,6 +91,7 @@ impl ServerStats {
|
||||
QueryPath::Cached => self.queries_cached += 1,
|
||||
QueryPath::Forwarded => self.queries_forwarded += 1,
|
||||
QueryPath::Recursive => self.queries_recursive += 1,
|
||||
QueryPath::Coalesced => self.queries_coalesced += 1,
|
||||
QueryPath::Blocked => self.queries_blocked += 1,
|
||||
QueryPath::Overridden => self.queries_overridden += 1,
|
||||
QueryPath::UpstreamError => self.upstream_errors += 1,
|
||||
@@ -106,6 +113,7 @@ impl ServerStats {
|
||||
total: self.queries_total,
|
||||
forwarded: self.queries_forwarded,
|
||||
recursive: self.queries_recursive,
|
||||
coalesced: self.queries_coalesced,
|
||||
cached: self.queries_cached,
|
||||
local: self.queries_local,
|
||||
overridden: self.queries_overridden,
|
||||
@@ -121,11 +129,12 @@ impl ServerStats {
|
||||
let secs = uptime.as_secs() % 60;
|
||||
|
||||
log::info!(
|
||||
"STATS | uptime {}h{}m{}s | total {} | fwd {} | recursive {} | cached {} | local {} | override {} | blocked {} | errors {}",
|
||||
"STATS | uptime {}h{}m{}s | total {} | fwd {} | recursive {} | coalesced {} | cached {} | local {} | override {} | blocked {} | errors {}",
|
||||
hours, mins, secs,
|
||||
self.queries_total,
|
||||
self.queries_forwarded,
|
||||
self.queries_recursive,
|
||||
self.queries_coalesced,
|
||||
self.queries_cached,
|
||||
self.queries_local,
|
||||
self.queries_overridden,
|
||||
@@ -140,6 +149,7 @@ pub struct StatsSnapshot {
|
||||
pub total: u64,
|
||||
pub forwarded: u64,
|
||||
pub recursive: u64,
|
||||
pub coalesced: u64,
|
||||
pub cached: u64,
|
||||
pub local: u64,
|
||||
pub overridden: u64,
|
||||
|
||||
Reference in New Issue
Block a user