Merge pull request #85 from razvandimescu/feat/wire-forwarding-hedging

feat: wire-level forwarding, cache, and request hedging
This commit was merged in pull request #85.
This commit is contained in:
Razvan Dimescu
2026-04-12 22:02:45 +03:00
committed by GitHub
19 changed files with 3759 additions and 285 deletions

460
Cargo.lock generated
View File

@@ -82,6 +82,12 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "anyhow"
version = "1.0.102"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
[[package]] [[package]]
name = "arc-swap" name = "arc-swap"
version = "1.9.0" version = "1.9.0"
@@ -142,6 +148,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "atomic-waker" name = "atomic-waker"
version = "1.1.2" version = "1.1.2"
@@ -410,6 +427,21 @@ dependencies = [
"itertools", "itertools",
] ]
[[package]]
name = "critical-section"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b"
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.8.6" version = "0.8.6"
@@ -493,6 +525,18 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "enum-as-inner"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "env_filter" name = "env_filter"
version = "1.0.1" version = "1.0.1"
@@ -554,6 +598,12 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]] [[package]]
name = "form_urlencoded" name = "form_urlencoded"
version = "1.2.2" version = "1.2.2"
@@ -679,11 +729,24 @@ dependencies = [
"cfg-if", "cfg-if",
"js-sys", "js-sys",
"libc", "libc",
"r-efi", "r-efi 5.3.0",
"wasip2", "wasip2",
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "getrandom"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
dependencies = [
"cfg-if",
"libc",
"r-efi 6.0.0",
"wasip2",
"wasip3",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.4.13" version = "0.4.13"
@@ -714,12 +777,82 @@ dependencies = [
"zerocopy", "zerocopy",
] ]
[[package]]
name = "hashbrown"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"foldhash",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.16.1" version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hickory-proto"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8a6fe56c0038198998a6f217ca4e7ef3a5e51f46163bd6dd60b5c71ca6c6502"
dependencies = [
"async-trait",
"bytes",
"cfg-if",
"data-encoding",
"enum-as-inner",
"futures-channel",
"futures-io",
"futures-util",
"h2",
"http",
"idna",
"ipnet",
"once_cell",
"rand",
"ring",
"rustls",
"thiserror",
"tinyvec",
"tokio",
"tokio-rustls",
"tracing",
"url",
"webpki-roots 0.26.11",
]
[[package]]
name = "hickory-resolver"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc62a9a99b0bfb44d2ab95a7208ac952d31060efc16241c87eaf36406fecf87a"
dependencies = [
"cfg-if",
"futures-util",
"hickory-proto",
"ipconfig",
"moka",
"once_cell",
"parking_lot",
"rand",
"resolv-conf",
"rustls",
"smallvec",
"thiserror",
"tokio",
"tokio-rustls",
"tracing",
"webpki-roots 0.26.11",
]
[[package]] [[package]]
name = "http" name = "http"
version = "1.4.0" version = "1.4.0"
@@ -802,7 +935,7 @@ dependencies = [
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tower-service", "tower-service",
"webpki-roots", "webpki-roots 1.0.6",
] ]
[[package]] [[package]]
@@ -909,6 +1042,12 @@ dependencies = [
"zerovec", "zerovec",
] ]
[[package]]
name = "id-arena"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954"
[[package]] [[package]]
name = "idna" name = "idna"
version = "1.1.0" version = "1.1.0"
@@ -937,7 +1076,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown", "hashbrown 0.16.1",
"serde",
"serde_core",
]
[[package]]
name = "ipconfig"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d40460c0ce33d6ce4b0630ad68ff63d6661961c48b6dba35e5a4d81cfb48222"
dependencies = [
"socket2",
"widestring",
"windows-registry",
"windows-result",
"windows-sys 0.61.2",
] ]
[[package]] [[package]]
@@ -1029,6 +1183,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "leb128fmt"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.183" version = "0.2.183"
@@ -1041,6 +1201,15 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
[[package]]
name = "lock_api"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
dependencies = [
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.29" version = "0.4.29"
@@ -1098,6 +1267,23 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "moka"
version = "0.12.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046"
dependencies = [
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"equivalent",
"parking_lot",
"portable-atomic",
"smallvec",
"tagptr",
"uuid",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.3" version = "7.1.3"
@@ -1151,6 +1337,8 @@ dependencies = [
"criterion", "criterion",
"env_logger", "env_logger",
"futures", "futures",
"hickory-proto",
"hickory-resolver",
"http", "http",
"http-body-util", "http-body-util",
"hyper", "hyper",
@@ -1170,7 +1358,7 @@ dependencies = [
"tokio-rustls", "tokio-rustls",
"toml", "toml",
"tower", "tower",
"webpki-roots", "webpki-roots 1.0.6",
] ]
[[package]] [[package]]
@@ -1187,6 +1375,10 @@ name = "once_cell"
version = "1.21.4" version = "1.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
dependencies = [
"critical-section",
"portable-atomic",
]
[[package]] [[package]]
name = "once_cell_polyfill" name = "once_cell_polyfill"
@@ -1210,6 +1402,29 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "parking_lot"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-link",
]
[[package]] [[package]]
name = "pem" name = "pem"
version = "3.0.6" version = "3.0.6"
@@ -1305,6 +1520,16 @@ dependencies = [
"zerocopy", "zerocopy",
] ]
[[package]]
name = "prettyplease"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.106" version = "1.0.106"
@@ -1390,6 +1615,12 @@ version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "r-efi"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf"
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.9.2" version = "0.9.2"
@@ -1453,6 +1684,15 @@ dependencies = [
"yasna", "yasna",
] ]
[[package]]
name = "redox_syscall"
version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.12.3" version = "1.12.3"
@@ -1518,9 +1758,15 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures", "wasm-bindgen-futures",
"web-sys", "web-sys",
"webpki-roots", "webpki-roots 1.0.6",
] ]
[[package]]
name = "resolv-conf"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7"
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.17.14" version = "0.17.14"
@@ -1618,6 +1864,18 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "semver"
version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.228" version = "1.0.228"
@@ -1780,6 +2038,12 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "2.0.18" version = "2.0.18"
@@ -2038,6 +2302,12 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
[[package]]
name = "unicode-xid"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.9.0" version = "0.9.0"
@@ -2068,6 +2338,17 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9"
dependencies = [
"getrandom 0.4.2",
"js-sys",
"wasm-bindgen",
]
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.5.0" version = "2.5.0"
@@ -2102,6 +2383,15 @@ dependencies = [
"wit-bindgen", "wit-bindgen",
] ]
[[package]]
name = "wasip3"
version = "0.4.0+wasi-0.3.0-rc-2026-01-06"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5"
dependencies = [
"wit-bindgen",
]
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.115" version = "0.2.115"
@@ -2157,6 +2447,40 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "wasm-encoder"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319"
dependencies = [
"leb128fmt",
"wasmparser",
]
[[package]]
name = "wasm-metadata"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909"
dependencies = [
"anyhow",
"indexmap",
"wasm-encoder",
"wasmparser",
]
[[package]]
name = "wasmparser"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags",
"hashbrown 0.15.5",
"indexmap",
"semver",
]
[[package]] [[package]]
name = "web-sys" name = "web-sys"
version = "0.3.92" version = "0.3.92"
@@ -2177,6 +2501,15 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "webpki-roots"
version = "0.26.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9"
dependencies = [
"webpki-roots 1.0.6",
]
[[package]] [[package]]
name = "webpki-roots" name = "webpki-roots"
version = "1.0.6" version = "1.0.6"
@@ -2186,6 +2519,12 @@ dependencies = [
"rustls-pki-types", "rustls-pki-types",
] ]
[[package]]
name = "widestring"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72069c3113ab32ab29e5584db3c6ec55d416895e60715417b5b883a357c3e471"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"
@@ -2223,6 +2562,35 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-registry"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720"
dependencies = [
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-result"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091"
dependencies = [
"windows-link",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.52.0" version = "0.52.0"
@@ -2390,6 +2758,88 @@ name = "wit-bindgen"
version = "0.51.0" version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5"
dependencies = [
"wit-bindgen-rust-macro",
]
[[package]]
name = "wit-bindgen-core"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc"
dependencies = [
"anyhow",
"heck",
"wit-parser",
]
[[package]]
name = "wit-bindgen-rust"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
dependencies = [
"anyhow",
"heck",
"indexmap",
"prettyplease",
"syn",
"wasm-metadata",
"wit-bindgen-core",
"wit-component",
]
[[package]]
name = "wit-bindgen-rust-macro"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a"
dependencies = [
"anyhow",
"prettyplease",
"proc-macro2",
"quote",
"syn",
"wit-bindgen-core",
"wit-bindgen-rust",
]
[[package]]
name = "wit-component"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags",
"indexmap",
"log",
"serde",
"serde_derive",
"serde_json",
"wasm-encoder",
"wasm-metadata",
"wasmparser",
"wit-parser",
]
[[package]]
name = "wit-parser"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736"
dependencies = [
"anyhow",
"id-arena",
"indexmap",
"log",
"semver",
"serde",
"serde_derive",
"serde_json",
"unicode-xid",
"wasmparser",
]
[[package]] [[package]]
name = "writeable" name = "writeable"

View File

@@ -37,6 +37,8 @@ webpki-roots = "1"
criterion = { version = "0.8", features = ["html_reports"] } criterion = { version = "0.8", features = ["html_reports"] }
tower = { version = "0.5", features = ["util"] } tower = { version = "0.5", features = ["util"] }
http = "1" http = "1"
hickory-resolver = { version = "0.25", features = ["https-ring", "webpki-roots"] }
hickory-proto = "0.25"
[[bench]] [[bench]]
name = "hot_path" name = "hot_path"
@@ -49,3 +51,7 @@ harness = false
[[bench]] [[bench]]
name = "dnssec" name = "dnssec"
harness = false harness = false
[[bench]]
name = "recursive_compare"
harness = false

View File

@@ -0,0 +1,30 @@
[server]
bind_addr = "127.0.0.1:5454"
api_port = 5381
api_bind_addr = "127.0.0.1"
data_dir = "/tmp/numa-bench"
[upstream]
mode = "recursive"
timeout_ms = 10000
[cache]
min_ttl = 60
max_ttl = 3600
[blocking]
enabled = false
[proxy]
port = 8080
tls_port = 8443
[dot]
enabled = true
port = 8530
[mobile]
enabled = false
[lan]
enabled = false

31
benches/numa-bench.toml Normal file
View File

@@ -0,0 +1,31 @@
[server]
bind_addr = "127.0.0.1:5454"
api_port = 5381
api_bind_addr = "127.0.0.1"
data_dir = "/tmp/numa-bench"
[upstream]
mode = "forward"
address = ["https://9.9.9.9/dns-query"]
timeout_ms = 10000
[cache]
min_ttl = 60
max_ttl = 3600
[blocking]
enabled = false
[proxy]
port = 8080
tls_port = 8443
[dot]
enabled = true
port = 8530
[mobile]
enabled = false
[lan]
enabled = false

1095
benches/recursive_compare.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -15,9 +15,15 @@ api_port = 5380
# address = "9.9.9.9" # single upstream (plain UDP) # address = "9.9.9.9" # single upstream (plain UDP)
# address = ["192.168.1.1", "9.9.9.9:5353"] # multiple upstreams — SRTT picks fastest # address = ["192.168.1.1", "9.9.9.9:5353"] # multiple upstreams — SRTT picks fastest
# address = "https://dns.quad9.net/dns-query" # DNS-over-HTTPS (encrypted) # address = "https://dns.quad9.net/dns-query" # DNS-over-HTTPS (encrypted)
# address = "tls://9.9.9.9#dns.quad9.net" # DNS-over-TLS (encrypted, port 853)
# fallback = ["8.8.8.8", "1.1.1.1"] # tried only when all primaries fail # fallback = ["8.8.8.8", "1.1.1.1"] # tried only when all primaries fail
# port = 53 # default port for addresses without :port # port = 53 # default port for addresses without :port
# timeout_ms = 3000 # timeout_ms = 3000
# hedge_ms = 10 # request hedging delay (ms). After this delay
# # without a response, fires a parallel request
# # to the same upstream. Rescues packet loss (UDP),
# # dispatch spikes (DoH), TLS stalls (DoT).
# # Set to 0 to disable. Default: 10
# root_hints = [ # only used in recursive mode # root_hints = [ # only used in recursive mode
# "198.41.0.4", # a.root-servers.net (Verisign) # "198.41.0.4", # a.root-servers.net (Verisign)
# "199.9.14.201", # b.root-servers.net (USC-ISI) # "199.9.14.201", # b.root-servers.net (USC-ISI)
@@ -60,7 +66,7 @@ api_port = 5380
# allowlist = ["example.com"] # domains to never block # allowlist = ["example.com"] # domains to never block
[cache] [cache]
max_entries = 10000 max_entries = 100000
min_ttl = 60 min_ttl = 60
max_ttl = 86400 max_ttl = 86400
# warm = ["google.com", "github.com"] # resolve at startup, refresh before TTL expiry # warm = ["google.com", "github.com"] # resolve at startup, refresh before TTL expiry

View File

@@ -1012,6 +1012,7 @@ mod tests {
socket, socket,
zone_map: std::collections::HashMap::new(), zone_map: std::collections::HashMap::new(),
cache: RwLock::new(crate::cache::DnsCache::new(100, 60, 86400)), cache: RwLock::new(crate::cache::DnsCache::new(100, 60, 86400)),
refreshing: Mutex::new(std::collections::HashSet::new()),
stats: Mutex::new(crate::stats::ServerStats::new()), stats: Mutex::new(crate::stats::ServerStats::new()),
overrides: RwLock::new(crate::override_store::OverrideStore::new()), overrides: RwLock::new(crate::override_store::OverrideStore::new()),
blocklist: RwLock::new(crate::blocklist::BlocklistStore::new()), blocklist: RwLock::new(crate::blocklist::BlocklistStore::new()),
@@ -1029,6 +1030,7 @@ mod tests {
upstream_port: 53, upstream_port: 53,
lan_ip: Mutex::new(std::net::Ipv4Addr::LOCALHOST), lan_ip: Mutex::new(std::net::Ipv4Addr::LOCALHOST),
timeout: std::time::Duration::from_secs(3), timeout: std::time::Duration::from_secs(3),
hedge_delay: std::time::Duration::ZERO,
proxy_tld: "numa".to_string(), proxy_tld: "numa".to_string(),
proxy_tld_suffix: ".numa".to_string(), proxy_tld_suffix: ".numa".to_string(),
lan_enabled: false, lan_enabled: false,

View File

@@ -1,9 +1,26 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crate::buffer::BytePacketBuffer;
use crate::packet::DnsPacket; use crate::packet::DnsPacket;
use crate::question::QueryType; use crate::question::QueryType;
use crate::record::DnsRecord; use crate::wire::WireMeta;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Freshness {
/// Within TTL, no action needed.
Fresh,
/// Within TTL but <10% remaining — trigger background prefetch.
NearExpiry,
/// Past TTL but within stale window — serve with TTL=1, trigger background refresh.
Stale,
}
impl Freshness {
pub fn needs_refresh(self) -> bool {
matches!(self, Freshness::NearExpiry | Freshness::Stale)
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum DnssecStatus { pub enum DnssecStatus {
@@ -26,14 +43,16 @@ impl DnssecStatus {
} }
struct CacheEntry { struct CacheEntry {
packet: DnsPacket, wire: Vec<u8>,
meta: WireMeta,
inserted_at: Instant, inserted_at: Instant,
ttl: Duration, ttl: Duration,
dnssec_status: DnssecStatus, dnssec_status: DnssecStatus,
} }
/// DNS cache using a two-level map (domain -> query_type -> entry) so that const STALE_WINDOW: Duration = Duration::from_secs(3600);
/// lookups can borrow `&str` instead of allocating a `String` key.
/// DNS cache with serve-stale (RFC 8767). Stores raw wire bytes.
pub struct DnsCache { pub struct DnsCache {
entries: HashMap<String, HashMap<QueryType, CacheEntry>>, entries: HashMap<String, HashMap<QueryType, CacheEntry>>,
entry_count: usize, entry_count: usize,
@@ -53,33 +72,118 @@ impl DnsCache {
} }
} }
/// Look up cached wire bytes, patching ID and TTLs in the returned copy.
/// Implements serve-stale (RFC 8767): expired entries within STALE_WINDOW
/// are returned with TTL=1 and `stale=true` so callers can revalidate.
pub fn lookup_wire(
&self,
domain: &str,
qtype: QueryType,
new_id: u16,
) -> Option<(Vec<u8>, DnssecStatus, Freshness)> {
let type_map = self.entries.get(domain)?;
let entry = type_map.get(&qtype)?;
let elapsed = entry.inserted_at.elapsed();
let (remaining, freshness) = if elapsed < entry.ttl {
let secs = (entry.ttl - elapsed).as_secs() as u32;
let f = if elapsed * 10 >= entry.ttl * 9 {
Freshness::NearExpiry
} else {
Freshness::Fresh
};
(secs.max(1), f)
} else if elapsed < entry.ttl + STALE_WINDOW {
(1, Freshness::Stale)
} else {
return None;
};
let mut wire = entry.wire.clone();
crate::wire::patch_id(&mut wire, new_id);
crate::wire::patch_ttls(&mut wire, &entry.meta.ttl_offsets, remaining);
Some((wire, entry.dnssec_status, freshness))
}
pub fn insert_wire(
&mut self,
domain: &str,
qtype: QueryType,
wire: &[u8],
dnssec_status: DnssecStatus,
) {
let meta = match crate::wire::scan_ttl_offsets(wire) {
Ok(m) => m,
Err(_) => return, // malformed wire, skip
};
if self.entry_count >= self.max_entries {
self.evict_expired();
if self.entry_count >= self.max_entries {
self.evict_stalest();
}
}
let min_ttl = crate::wire::min_ttl_from_wire(wire, &meta)
.unwrap_or(self.min_ttl)
.clamp(self.min_ttl, self.max_ttl);
let type_map = if let Some(existing) = self.entries.get_mut(domain) {
existing
} else {
self.entries.entry(domain.to_string()).or_default()
};
if !type_map.contains_key(&qtype) {
self.entry_count += 1;
}
type_map.insert(
qtype,
CacheEntry {
wire: wire.to_vec(),
meta,
inserted_at: Instant::now(),
ttl: Duration::from_secs(min_ttl as u64),
dnssec_status,
},
);
}
/// Read-only lookup — expired entries are left in place (cleaned up on insert). /// Read-only lookup — expired entries are left in place (cleaned up on insert).
pub fn lookup(&self, domain: &str, qtype: QueryType) -> Option<DnsPacket> { pub fn lookup(&self, domain: &str, qtype: QueryType) -> Option<DnsPacket> {
self.lookup_with_status(domain, qtype).map(|(pkt, _)| pkt) self.lookup_with_status(domain, qtype)
.map(|(pkt, _, _)| pkt)
} }
pub fn lookup_with_status( pub fn lookup_with_status(
&self, &self,
domain: &str, domain: &str,
qtype: QueryType, qtype: QueryType,
) -> Option<(DnsPacket, DnssecStatus)> { ) -> Option<(DnsPacket, DnssecStatus, Freshness)> {
let type_map = self.entries.get(domain)?; let (wire, status, freshness) = self.lookup_wire(domain, qtype, 0)?;
let entry = type_map.get(&qtype)?; let mut buf = BytePacketBuffer::from_bytes(&wire);
let pkt = DnsPacket::from_buffer(&mut buf).ok()?;
Some((pkt, status, freshness))
}
let elapsed = entry.inserted_at.elapsed(); pub fn insert(&mut self, domain: &str, qtype: QueryType, packet: &DnsPacket) {
if elapsed >= entry.ttl { self.insert_with_status(domain, qtype, packet, DnssecStatus::Indeterminate);
return None; }
pub fn insert_with_status(
&mut self,
domain: &str,
qtype: QueryType,
packet: &DnsPacket,
dnssec_status: DnssecStatus,
) {
let mut buf = BytePacketBuffer::new();
if packet.write(&mut buf).is_err() {
return;
} }
self.insert_wire(domain, qtype, buf.filled(), dnssec_status);
let remaining_secs = (entry.ttl - elapsed).as_secs() as u32;
let remaining = remaining_secs.max(1);
let mut packet = entry.packet.clone();
adjust_ttls(&mut packet.answers, remaining);
adjust_ttls(&mut packet.authorities, remaining);
adjust_ttls(&mut packet.resources, remaining);
Some((packet, entry.dnssec_status))
} }
pub fn ttl_remaining(&self, domain: &str, qtype: QueryType) -> Option<(u32, u32)> { pub fn ttl_remaining(&self, domain: &str, qtype: QueryType) -> Option<(u32, u32)> {
@@ -105,49 +209,6 @@ impl DnsCache {
false false
} }
pub fn insert(&mut self, domain: &str, qtype: QueryType, packet: &DnsPacket) {
self.insert_with_status(domain, qtype, packet, DnssecStatus::Indeterminate);
}
pub fn insert_with_status(
&mut self,
domain: &str,
qtype: QueryType,
packet: &DnsPacket,
dnssec_status: DnssecStatus,
) {
if self.entry_count >= self.max_entries {
self.evict_expired();
if self.entry_count >= self.max_entries {
return;
}
}
let min_ttl = extract_min_ttl(&packet.answers)
.unwrap_or(self.min_ttl)
.clamp(self.min_ttl, self.max_ttl);
let type_map = if let Some(existing) = self.entries.get_mut(domain) {
existing
} else {
self.entries.entry(domain.to_string()).or_default()
};
if !type_map.contains_key(&qtype) {
self.entry_count += 1;
}
type_map.insert(
qtype,
CacheEntry {
packet: packet.clone(),
inserted_at: Instant::now(),
ttl: Duration::from_secs(min_ttl as u64),
dnssec_status,
},
);
}
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.entry_count self.entry_count
} }
@@ -179,7 +240,8 @@ impl DnsCache {
+ 1; + 1;
total += type_map.capacity() * inner_slot; total += type_map.capacity() * inner_slot;
for entry in type_map.values() { for entry in type_map.values() {
total += entry.packet.heap_bytes(); total += entry.wire.capacity()
+ entry.meta.ttl_offsets.capacity() * std::mem::size_of::<usize>();
} }
} }
total total
@@ -220,6 +282,34 @@ impl DnsCache {
}); });
self.entry_count -= count; self.entry_count -= count;
} }
/// Evict the single entry closest to (or furthest past) expiry.
fn evict_stalest(&mut self) {
let mut worst: Option<(String, QueryType, Duration)> = None;
for (domain, type_map) in &self.entries {
for (qtype, entry) in type_map {
let age = entry.inserted_at.elapsed();
let remaining = entry.ttl.saturating_sub(age);
match &worst {
None => worst = Some((domain.clone(), *qtype, remaining)),
Some((_, _, w)) if remaining < *w => {
worst = Some((domain.clone(), *qtype, remaining));
}
_ => {}
}
}
}
if let Some((domain, qtype, _)) = worst {
if let Some(type_map) = self.entries.get_mut(&domain) {
if type_map.remove(&qtype).is_some() {
self.entry_count -= 1;
}
if type_map.is_empty() {
self.entries.remove(&domain);
}
}
}
}
} }
pub struct CacheInfo { pub struct CacheInfo {
@@ -228,20 +318,11 @@ pub struct CacheInfo {
pub ttl_remaining: u32, pub ttl_remaining: u32,
} }
fn extract_min_ttl(records: &[DnsRecord]) -> Option<u32> {
records.iter().map(|r| r.ttl()).min()
}
fn adjust_ttls(records: &mut [DnsRecord], new_ttl: u32) {
for record in records.iter_mut() {
record.set_ttl(new_ttl);
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::packet::DnsPacket; use crate::packet::DnsPacket;
use crate::record::DnsRecord;
#[test] #[test]
fn heap_bytes_grows_with_entries() { fn heap_bytes_grows_with_entries() {

View File

@@ -138,6 +138,8 @@ pub struct UpstreamConfig {
pub fallback: Vec<String>, pub fallback: Vec<String>,
#[serde(default = "default_timeout_ms")] #[serde(default = "default_timeout_ms")]
pub timeout_ms: u64, pub timeout_ms: u64,
#[serde(default = "default_hedge_ms")]
pub hedge_ms: u64,
#[serde(default = "default_root_hints")] #[serde(default = "default_root_hints")]
pub root_hints: Vec<String>, pub root_hints: Vec<String>,
#[serde(default = "default_prime_tlds")] #[serde(default = "default_prime_tlds")]
@@ -154,6 +156,7 @@ impl Default for UpstreamConfig {
port: default_upstream_port(), port: default_upstream_port(),
fallback: Vec::new(), fallback: Vec::new(),
timeout_ms: default_timeout_ms(), timeout_ms: default_timeout_ms(),
hedge_ms: default_hedge_ms(),
root_hints: default_root_hints(), root_hints: default_root_hints(),
prime_tlds: default_prime_tlds(), prime_tlds: default_prime_tlds(),
srtt: default_srtt(), srtt: default_srtt(),
@@ -271,6 +274,9 @@ fn default_upstream_port() -> u16 {
fn default_timeout_ms() -> u64 { fn default_timeout_ms() -> u64 {
5000 5000
} }
fn default_hedge_ms() -> u64 {
10
}
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct CacheConfig { pub struct CacheConfig {
@@ -296,7 +302,7 @@ impl Default for CacheConfig {
} }
fn default_max_entries() -> usize { fn default_max_entries() -> usize {
10000 100_000
} }
fn default_min_ttl() -> u32 { fn default_min_ttl() -> u32 {
60 60

View File

@@ -1,7 +1,7 @@
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
@@ -16,7 +16,7 @@ use crate::blocklist::BlocklistStore;
use crate::buffer::BytePacketBuffer; use crate::buffer::BytePacketBuffer;
use crate::cache::{DnsCache, DnssecStatus}; use crate::cache::{DnsCache, DnssecStatus};
use crate::config::{UpstreamMode, ZoneMap}; use crate::config::{UpstreamMode, ZoneMap};
use crate::forward::{forward_query, forward_with_failover, Upstream, UpstreamPool}; use crate::forward::{forward_query_raw, forward_with_failover_raw, Upstream, UpstreamPool};
use crate::header::ResultCode; use crate::header::ResultCode;
use crate::health::HealthMeta; use crate::health::HealthMeta;
use crate::lan::PeerStore; use crate::lan::PeerStore;
@@ -35,6 +35,8 @@ pub struct ServerCtx {
pub zone_map: ZoneMap, pub zone_map: ZoneMap,
/// std::sync::RwLock (not tokio) — locks must never be held across .await points. /// std::sync::RwLock (not tokio) — locks must never be held across .await points.
pub cache: RwLock<DnsCache>, pub cache: RwLock<DnsCache>,
/// Domains currently being refreshed in the background (dedup guard).
pub refreshing: Mutex<HashSet<(String, QueryType)>>,
pub stats: Mutex<ServerStats>, pub stats: Mutex<ServerStats>,
pub overrides: RwLock<OverrideStore>, pub overrides: RwLock<OverrideStore>,
pub blocklist: RwLock<BlocklistStore>, pub blocklist: RwLock<BlocklistStore>,
@@ -47,6 +49,7 @@ pub struct ServerCtx {
pub upstream_port: u16, pub upstream_port: u16,
pub lan_ip: Mutex<std::net::Ipv4Addr>, pub lan_ip: Mutex<std::net::Ipv4Addr>,
pub timeout: Duration, pub timeout: Duration,
pub hedge_delay: Duration,
pub proxy_tld: String, pub proxy_tld: String,
pub proxy_tld_suffix: String, // pre-computed ".{tld}" to avoid per-query allocation pub proxy_tld_suffix: String, // pre-computed ".{tld}" to avoid per-query allocation
pub lan_enabled: bool, pub lan_enabled: bool,
@@ -81,8 +84,9 @@ pub struct ServerCtx {
/// (and logging parse errors) before calling this function. /// (and logging parse errors) before calling this function.
pub async fn resolve_query( pub async fn resolve_query(
query: DnsPacket, query: DnsPacket,
raw_wire: &[u8],
src_addr: SocketAddr, src_addr: SocketAddr,
ctx: &ServerCtx, ctx: &Arc<ServerCtx>,
) -> crate::Result<BytePacketBuffer> { ) -> crate::Result<BytePacketBuffer> {
let start = Instant::now(); let start = Instant::now();
@@ -164,7 +168,18 @@ pub async fn resolve_query(
(resp, QueryPath::Blocked, DnssecStatus::Indeterminate) (resp, QueryPath::Blocked, DnssecStatus::Indeterminate)
} else { } else {
let cached = ctx.cache.read().unwrap().lookup_with_status(&qname, qtype); let cached = ctx.cache.read().unwrap().lookup_with_status(&qname, qtype);
if let Some((cached, cached_dnssec)) = cached { if let Some((cached, cached_dnssec, freshness)) = cached {
if freshness.needs_refresh() {
let key = (qname.clone(), qtype);
let already = !ctx.refreshing.lock().unwrap().insert(key.clone());
if !already {
let ctx = Arc::clone(ctx);
tokio::spawn(async move {
refresh_entry(&ctx, &key.0, key.1).await;
ctx.refreshing.lock().unwrap().remove(&key);
});
}
}
let mut resp = cached; let mut resp = cached;
resp.header.id = query.header.id; resp.header.id = query.header.id;
if cached_dnssec == DnssecStatus::Secure { if cached_dnssec == DnssecStatus::Secure {
@@ -177,11 +192,8 @@ pub async fn resolve_query(
// Conditional forwarding takes priority over recursive mode // Conditional forwarding takes priority over recursive mode
// (e.g. Tailscale .ts.net, VPC private zones) // (e.g. Tailscale .ts.net, VPC private zones)
let upstream = Upstream::Udp(fwd_addr); let upstream = Upstream::Udp(fwd_addr);
match forward_query(&query, &upstream, ctx.timeout).await { match forward_and_cache(raw_wire, &upstream, ctx, &qname, qtype).await {
Ok(resp) => { Ok(resp) => (resp, QueryPath::Forwarded, DnssecStatus::Indeterminate),
ctx.cache.write().unwrap().insert(&qname, qtype, &resp);
(resp, QueryPath::Forwarded, DnssecStatus::Indeterminate)
}
Err(e) => { Err(e) => {
error!( error!(
"{} | {:?} {} | FORWARD ERROR | {}", "{} | {:?} {} | FORWARD ERROR | {}",
@@ -221,11 +233,26 @@ pub async fn resolve_query(
(resp, path, DnssecStatus::Indeterminate) (resp, path, DnssecStatus::Indeterminate)
} else { } else {
let pool = ctx.upstream_pool.lock().unwrap().clone(); let pool = ctx.upstream_pool.lock().unwrap().clone();
match forward_with_failover(&query, &pool, &ctx.srtt, ctx.timeout).await { match forward_with_failover_raw(
Ok(resp) => { raw_wire,
ctx.cache.write().unwrap().insert(&qname, qtype, &resp); &pool,
(resp, QueryPath::Forwarded, DnssecStatus::Indeterminate) &ctx.srtt,
} ctx.timeout,
ctx.hedge_delay,
)
.await
{
Ok(resp_wire) => match cache_and_parse(ctx, &qname, qtype, &resp_wire) {
Ok(resp) => (resp, QueryPath::Forwarded, DnssecStatus::Indeterminate),
Err(e) => {
error!("{} | {:?} {} | PARSE ERROR | {}", src_addr, qtype, qname, e);
(
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
DnssecStatus::Indeterminate,
)
}
},
Err(e) => { Err(e) => {
error!( error!(
"{} | {:?} {} | UPSTREAM ERROR | {}", "{} | {:?} {} | UPSTREAM ERROR | {}",
@@ -347,11 +374,77 @@ pub async fn resolve_query(
Ok(resp_buffer) Ok(resp_buffer)
} }
/// Handle a DNS query received over UDP. Thin wrapper around resolve_query. fn cache_and_parse(
ctx: &ServerCtx,
qname: &str,
qtype: QueryType,
resp_wire: &[u8],
) -> crate::Result<DnsPacket> {
ctx.cache
.write()
.unwrap()
.insert_wire(qname, qtype, resp_wire, DnssecStatus::Indeterminate);
let mut buf = BytePacketBuffer::from_bytes(resp_wire);
DnsPacket::from_buffer(&mut buf)
}
/// Re-resolve a single (domain, qtype) and update the cache.
/// Used for both stale-entry refresh and proactive cache warming.
pub async fn refresh_entry(ctx: &ServerCtx, qname: &str, qtype: QueryType) {
let query = DnsPacket::query(0, qname, qtype);
if ctx.upstream_mode == UpstreamMode::Recursive {
if let Ok(resp) = crate::recursive::resolve_recursive(
qname,
qtype,
&ctx.cache,
&query,
&ctx.root_hints,
&ctx.srtt,
)
.await
{
ctx.cache.write().unwrap().insert(qname, qtype, &resp);
}
} else {
let mut buf = BytePacketBuffer::new();
if query.write(&mut buf).is_ok() {
let pool = ctx.upstream_pool.lock().unwrap().clone();
if let Ok(wire) = forward_with_failover_raw(
buf.filled(),
&pool,
&ctx.srtt,
ctx.timeout,
ctx.hedge_delay,
)
.await
{
ctx.cache.write().unwrap().insert_wire(
qname,
qtype,
&wire,
DnssecStatus::Indeterminate,
);
}
}
}
}
async fn forward_and_cache(
wire: &[u8],
upstream: &Upstream,
ctx: &ServerCtx,
qname: &str,
qtype: QueryType,
) -> crate::Result<DnsPacket> {
let resp_wire = forward_query_raw(wire, upstream, ctx.timeout).await?;
cache_and_parse(ctx, qname, qtype, &resp_wire)
}
pub async fn handle_query( pub async fn handle_query(
mut buffer: BytePacketBuffer, mut buffer: BytePacketBuffer,
raw_len: usize,
src_addr: SocketAddr, src_addr: SocketAddr,
ctx: &ServerCtx, ctx: &Arc<ServerCtx>,
) -> crate::Result<()> { ) -> crate::Result<()> {
let query = match DnsPacket::from_buffer(&mut buffer) { let query = match DnsPacket::from_buffer(&mut buffer) {
Ok(packet) => packet, Ok(packet) => packet,
@@ -360,7 +453,7 @@ pub async fn handle_query(
return Ok(()); return Ok(());
} }
}; };
match resolve_query(query, src_addr, ctx).await { match resolve_query(query, &buffer.buf[..raw_len], src_addr, ctx).await {
Ok(resp_buffer) => { Ok(resp_buffer) => {
ctx.socket.send_to(resp_buffer.filled(), src_addr).await?; ctx.socket.send_to(resp_buffer.filled(), src_addr).await?;
} }

View File

@@ -60,7 +60,11 @@ fn is_doh_host(host: Option<&str>, tld: &str) -> bool {
} }
} }
async fn resolve_doh(dns_bytes: &[u8], src: SocketAddr, ctx: &ServerCtx) -> Response { async fn resolve_doh(
dns_bytes: &[u8],
src: SocketAddr,
ctx: &std::sync::Arc<ServerCtx>,
) -> Response {
let mut buffer = BytePacketBuffer::from_bytes(dns_bytes); let mut buffer = BytePacketBuffer::from_bytes(dns_bytes);
let query = match DnsPacket::from_buffer(&mut buffer) { let query = match DnsPacket::from_buffer(&mut buffer) {
Ok(q) => q, Ok(q) => q,
@@ -82,7 +86,7 @@ async fn resolve_doh(dns_bytes: &[u8], src: SocketAddr, ctx: &ServerCtx) -> Resp
let query_rd = query.header.recursion_desired; let query_rd = query.header.recursion_desired;
let questions = query.questions.clone(); let questions = query.questions.clone();
match resolve_query(query, src, ctx).await { match resolve_query(query, dns_bytes, src, ctx).await {
Ok(resp_buffer) => { Ok(resp_buffer) => {
let min_ttl = extract_min_ttl(resp_buffer.filled()); let min_ttl = extract_min_ttl(resp_buffer.filled());
dns_response(resp_buffer.filled(), min_ttl) dns_response(resp_buffer.filled(), min_ttl)
@@ -102,11 +106,10 @@ async fn resolve_doh(dns_bytes: &[u8], src: SocketAddr, ctx: &ServerCtx) -> Resp
} }
fn extract_min_ttl(wire: &[u8]) -> u32 { fn extract_min_ttl(wire: &[u8]) -> u32 {
let mut buf = BytePacketBuffer::from_bytes(wire); crate::wire::scan_ttl_offsets(wire)
match DnsPacket::from_buffer(&mut buf) { .ok()
Ok(pkt) => pkt.answers.iter().map(|r| r.ttl()).min().unwrap_or(0), .and_then(|meta| crate::wire::min_ttl_from_wire(wire, &meta))
Err(_) => 0, .unwrap_or(0)
}
} }
fn dns_response(wire: &[u8], min_ttl: u32) -> Response { fn dns_response(wire: &[u8], min_ttl: u32) -> Response {

View File

@@ -153,8 +153,11 @@ async fn accept_loop(listener: TcpListener, acceptor: TlsAcceptor, ctx: Arc<Serv
/// Handle a single persistent DoT connection (RFC 7858). /// Handle a single persistent DoT connection (RFC 7858).
/// Reads length-prefixed DNS queries until EOF, idle timeout, or error. /// Reads length-prefixed DNS queries until EOF, idle timeout, or error.
async fn handle_dot_connection<S>(mut stream: S, remote_addr: SocketAddr, ctx: &ServerCtx) async fn handle_dot_connection<S>(
where mut stream: S,
remote_addr: SocketAddr,
ctx: &std::sync::Arc<ServerCtx>,
) where
S: AsyncReadExt + AsyncWriteExt + Unpin, S: AsyncReadExt + AsyncWriteExt + Unpin,
{ {
loop { loop {
@@ -177,8 +180,6 @@ where
break; break;
}; };
// Parse query up-front so we can echo its question section in SERVFAIL
// responses when resolve_query fails.
let query = match DnsPacket::from_buffer(&mut buffer) { let query = match DnsPacket::from_buffer(&mut buffer) {
Ok(q) => q, Ok(q) => q,
Err(e) => { Err(e) => {
@@ -200,7 +201,7 @@ where
} }
}; };
match resolve_query(query.clone(), remote_addr, ctx).await { match resolve_query(query.clone(), &buffer.buf[..msg_len], remote_addr, ctx).await {
Ok(resp_buffer) => { Ok(resp_buffer) => {
if write_framed(&mut stream, resp_buffer.filled()) if write_framed(&mut stream, resp_buffer.filled())
.await .await
@@ -355,6 +356,7 @@ mod tests {
m m
}, },
cache: RwLock::new(crate::cache::DnsCache::new(100, 60, 86400)), cache: RwLock::new(crate::cache::DnsCache::new(100, 60, 86400)),
refreshing: Mutex::new(std::collections::HashSet::new()),
stats: Mutex::new(crate::stats::ServerStats::new()), stats: Mutex::new(crate::stats::ServerStats::new()),
overrides: RwLock::new(crate::override_store::OverrideStore::new()), overrides: RwLock::new(crate::override_store::OverrideStore::new()),
blocklist: RwLock::new(crate::blocklist::BlocklistStore::new()), blocklist: RwLock::new(crate::blocklist::BlocklistStore::new()),
@@ -370,6 +372,7 @@ mod tests {
upstream_port: 53, upstream_port: 53,
lan_ip: Mutex::new(std::net::Ipv4Addr::LOCALHOST), lan_ip: Mutex::new(std::net::Ipv4Addr::LOCALHOST),
timeout: Duration::from_millis(200), timeout: Duration::from_millis(200),
hedge_delay: Duration::ZERO,
proxy_tld: "numa".to_string(), proxy_tld: "numa".to_string(),
proxy_tld_suffix: ".numa".to_string(), proxy_tld_suffix: ".numa".to_string(),
lan_enabled: false, lan_enabled: false,

View File

@@ -65,6 +65,13 @@ pub fn parse_upstream(s: &str, default_port: u16) -> Result<Upstream> {
if s.starts_with("https://") { if s.starts_with("https://") {
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.use_rustls_tls() .use_rustls_tls()
.http2_initial_stream_window_size(65_535)
.http2_initial_connection_window_size(65_535)
.http2_keep_alive_interval(Duration::from_secs(15))
.http2_keep_alive_while_idle(true)
.http2_keep_alive_timeout(Duration::from_secs(10))
.pool_idle_timeout(Duration::from_secs(300))
.pool_max_idle_per_host(1)
.build() .build()
.unwrap_or_default(); .unwrap_or_default();
return Ok(Upstream::Doh { return Ok(Upstream::Doh {
@@ -150,72 +157,16 @@ impl UpstreamPool {
} }
} }
pub async fn forward_with_failover(
query: &DnsPacket,
pool: &UpstreamPool,
srtt: &RwLock<SrttCache>,
timeout_duration: Duration,
) -> Result<DnsPacket> {
// Build candidate list: primary (sorted by SRTT for UDP) then fallback
let mut candidates: Vec<(usize, u64)> = pool
.primary
.iter()
.enumerate()
.map(|(i, u)| {
let rtt = match u {
Upstream::Udp(addr) => srtt.read().unwrap().get(addr.ip()),
_ => 0, // DoH: keep config order (stable sort preserves it)
};
(i, rtt)
})
.collect();
candidates.sort_by_key(|&(_, rtt)| rtt);
let all_upstreams: Vec<&Upstream> = candidates
.iter()
.map(|&(i, _)| &pool.primary[i])
.chain(pool.fallback.iter())
.collect();
let mut last_err: Option<Box<dyn std::error::Error + Send + Sync>> = None;
for upstream in &all_upstreams {
let start = Instant::now();
match forward_query(query, upstream, timeout_duration).await {
Ok(resp) => {
if let Upstream::Udp(addr) = upstream {
let rtt_ms = start.elapsed().as_millis() as u64;
srtt.write().unwrap().record_rtt(addr.ip(), rtt_ms, false);
}
return Ok(resp);
}
Err(e) => {
if let Upstream::Udp(addr) = upstream {
srtt.write().unwrap().record_failure(addr.ip());
}
log::debug!("upstream {} failed: {}", upstream, e);
last_err = Some(e);
}
}
}
Err(last_err.unwrap_or_else(|| "no upstream configured".into()))
}
pub async fn forward_query( pub async fn forward_query(
query: &DnsPacket, query: &DnsPacket,
upstream: &Upstream, upstream: &Upstream,
timeout_duration: Duration, timeout_duration: Duration,
) -> Result<DnsPacket> { ) -> Result<DnsPacket> {
match upstream { let mut send_buffer = BytePacketBuffer::new();
Upstream::Udp(addr) => forward_udp(query, *addr, timeout_duration).await, query.write(&mut send_buffer)?;
Upstream::Doh { url, client } => forward_doh(query, url, client, timeout_duration).await, let data = forward_query_raw(send_buffer.filled(), upstream, timeout_duration).await?;
Upstream::Dot { let mut recv_buffer = BytePacketBuffer::from_bytes(&data);
addr, DnsPacket::from_buffer(&mut recv_buffer)
tls_name,
connector,
} => forward_dot(query, *addr, tls_name, connector, timeout_duration).await,
}
} }
pub(crate) async fn forward_udp( pub(crate) async fn forward_udp(
@@ -223,24 +174,10 @@ pub(crate) async fn forward_udp(
upstream: SocketAddr, upstream: SocketAddr,
timeout_duration: Duration, timeout_duration: Duration,
) -> Result<DnsPacket> { ) -> Result<DnsPacket> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
let mut send_buffer = BytePacketBuffer::new(); let mut send_buffer = BytePacketBuffer::new();
query.write(&mut send_buffer)?; query.write(&mut send_buffer)?;
let data = forward_udp_raw(send_buffer.filled(), upstream, timeout_duration).await?;
socket.send_to(send_buffer.filled(), upstream).await?; let mut recv_buffer = BytePacketBuffer::from_bytes(&data);
let mut recv_buffer = BytePacketBuffer::new();
let (size, _) = timeout(timeout_duration, socket.recv_from(&mut recv_buffer.buf)).await??;
if size == recv_buffer.buf.len() {
log::debug!(
"upstream response truncated ({} bytes, buffer {})",
size,
recv_buffer.buf.len()
);
}
DnsPacket::from_buffer(&mut recv_buffer) DnsPacket::from_buffer(&mut recv_buffer)
} }
@@ -277,13 +214,13 @@ pub(crate) async fn forward_tcp(
DnsPacket::from_buffer(&mut recv_buffer) DnsPacket::from_buffer(&mut recv_buffer)
} }
async fn forward_dot( async fn forward_dot_raw(
query: &DnsPacket, wire: &[u8],
addr: SocketAddr, addr: SocketAddr,
tls_name: &Option<String>, tls_name: &Option<String>,
connector: &tokio_rustls::TlsConnector, connector: &tokio_rustls::TlsConnector,
timeout_duration: Duration, timeout_duration: Duration,
) -> Result<DnsPacket> { ) -> Result<Vec<u8>> {
use rustls::pki_types::ServerName; use rustls::pki_types::ServerName;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
@@ -296,10 +233,6 @@ async fn forward_dot(
let tcp = timeout(timeout_duration, TcpStream::connect(addr)).await??; let tcp = timeout(timeout_duration, TcpStream::connect(addr)).await??;
let mut tls = timeout(timeout_duration, connector.connect(server_name, tcp)).await??; let mut tls = timeout(timeout_duration, connector.connect(server_name, tcp)).await??;
let mut send_buffer = BytePacketBuffer::new();
query.write(&mut send_buffer)?;
let wire = send_buffer.filled();
let mut outbuf = Vec::with_capacity(2 + wire.len()); let mut outbuf = Vec::with_capacity(2 + wire.len());
outbuf.extend_from_slice(&(wire.len() as u16).to_be_bytes()); outbuf.extend_from_slice(&(wire.len() as u16).to_be_bytes());
outbuf.extend_from_slice(wire); outbuf.extend_from_slice(wire);
@@ -312,26 +245,178 @@ async fn forward_dot(
let mut data = vec![0u8; resp_len]; let mut data = vec![0u8; resp_len];
timeout(timeout_duration, tls.read_exact(&mut data)).await??; timeout(timeout_duration, tls.read_exact(&mut data)).await??;
let mut recv_buffer = BytePacketBuffer::from_bytes(&data); Ok(data)
DnsPacket::from_buffer(&mut recv_buffer)
} }
async fn forward_doh( pub async fn forward_query_raw(
query: &DnsPacket, wire: &[u8],
upstream: &Upstream,
timeout_duration: Duration,
) -> Result<Vec<u8>> {
match upstream {
Upstream::Udp(addr) => forward_udp_raw(wire, *addr, timeout_duration).await,
Upstream::Doh { url, client } => forward_doh_raw(wire, url, client, timeout_duration).await,
Upstream::Dot {
addr,
tls_name,
connector,
} => forward_dot_raw(wire, *addr, tls_name, connector, timeout_duration).await,
}
}
pub async fn forward_with_hedging_raw(
wire: &[u8],
primary: &Upstream,
secondary: &Upstream,
hedge_delay: Duration,
timeout_duration: Duration,
) -> Result<Vec<u8>> {
use tokio::time::sleep;
let primary_fut = forward_query_raw(wire, primary, timeout_duration);
tokio::pin!(primary_fut);
let delay = sleep(hedge_delay);
tokio::pin!(delay);
// Phase 1: wait for either primary to return, or the hedge delay.
tokio::select! {
result = &mut primary_fut => return result,
_ = &mut delay => {}
}
// Phase 2: hedge delay expired — fire secondary while still polling primary.
let secondary_fut = forward_query_raw(wire, secondary, timeout_duration);
tokio::pin!(secondary_fut);
// First successful response wins. If one errors, wait for the other.
let mut primary_err: Option<crate::Error> = None;
let mut secondary_err: Option<crate::Error> = None;
loop {
tokio::select! {
r = &mut primary_fut, if primary_err.is_none() => {
match r {
Ok(resp) => return Ok(resp),
Err(e) => {
if let Some(se) = secondary_err.take() {
return Err(se);
}
primary_err = Some(e);
}
}
}
r = &mut secondary_fut, if secondary_err.is_none() => {
match r {
Ok(resp) => return Ok(resp),
Err(e) => {
if let Some(pe) = primary_err.take() {
return Err(pe);
}
secondary_err = Some(e);
}
}
}
}
match (primary_err, secondary_err) {
(Some(pe), Some(_)) => return Err(pe),
(pe, se) => {
primary_err = pe;
secondary_err = se;
}
}
}
}
pub async fn forward_with_failover_raw(
wire: &[u8],
pool: &UpstreamPool,
srtt: &RwLock<SrttCache>,
timeout_duration: Duration,
hedge_delay: Duration,
) -> Result<Vec<u8>> {
let mut candidates: Vec<(usize, u64)> = pool
.primary
.iter()
.enumerate()
.map(|(i, u)| {
let rtt = match u {
Upstream::Udp(addr) => srtt.read().unwrap().get(addr.ip()),
_ => 0,
};
(i, rtt)
})
.collect();
candidates.sort_by_key(|&(_, rtt)| rtt);
let all_upstreams: Vec<&Upstream> = candidates
.iter()
.map(|&(i, _)| &pool.primary[i])
.chain(pool.fallback.iter())
.collect();
let mut last_err: Option<Box<dyn std::error::Error + Send + Sync>> = None;
for upstream in &all_upstreams {
let start = Instant::now();
let result = if !hedge_delay.is_zero() {
// Hedge against the same upstream: independent h2 streams (DoH),
// independent UDP packets (plain DNS), or independent TLS
// connections (DoT). Rescues packet loss, dispatch spikes, and
// TLS handshake stalls.
forward_with_hedging_raw(wire, upstream, upstream, hedge_delay, timeout_duration).await
} else {
forward_query_raw(wire, upstream, timeout_duration).await
};
match result {
Ok(resp) => {
if let Upstream::Udp(addr) = upstream {
let rtt_ms = start.elapsed().as_millis() as u64;
srtt.write().unwrap().record_rtt(addr.ip(), rtt_ms, false);
}
return Ok(resp);
}
Err(e) => {
if let Upstream::Udp(addr) = upstream {
srtt.write().unwrap().record_failure(addr.ip());
}
log::debug!("upstream {} failed: {}", upstream, e);
last_err = Some(e);
}
}
}
Err(last_err.unwrap_or_else(|| "no upstream configured".into()))
}
async fn forward_udp_raw(
wire: &[u8],
upstream: SocketAddr,
timeout_duration: Duration,
) -> Result<Vec<u8>> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket.send_to(wire, upstream).await?;
let mut recv_buf = vec![0u8; 4096];
let (size, _) = timeout(timeout_duration, socket.recv_from(&mut recv_buf)).await??;
recv_buf.truncate(size);
Ok(recv_buf)
}
async fn forward_doh_raw(
wire: &[u8],
url: &str, url: &str,
client: &reqwest::Client, client: &reqwest::Client,
timeout_duration: Duration, timeout_duration: Duration,
) -> Result<DnsPacket> { ) -> Result<Vec<u8>> {
let mut send_buffer = BytePacketBuffer::new();
query.write(&mut send_buffer)?;
let resp = timeout( let resp = timeout(
timeout_duration, timeout_duration,
client client
.post(url) .post(url)
.header("content-type", "application/dns-message") .header("content-type", "application/dns-message")
.header("accept", "application/dns-message") .header("accept", "application/dns-message")
.body(send_buffer.filled().to_vec()) .body(wire.to_vec())
.send(), .send(),
) )
.await?? .await??
@@ -339,9 +424,25 @@ async fn forward_doh(
let bytes = resp.bytes().await?; let bytes = resp.bytes().await?;
log::debug!("DoH response: {} bytes", bytes.len()); log::debug!("DoH response: {} bytes", bytes.len());
Ok(bytes.to_vec())
}
let mut recv_buffer = BytePacketBuffer::from_bytes(&bytes); /// Send a lightweight keepalive query to a DoH upstream to prevent
DnsPacket::from_buffer(&mut recv_buffer) /// the HTTP/2 + TLS connection from going idle and being torn down.
pub async fn keepalive_doh(upstream: &Upstream) {
if let Upstream::Doh { url, client } = upstream {
// Query for . NS — minimal, always succeeds, response is small
let wire: &[u8] = &[
0x00, 0x00, // ID
0x01, 0x00, // flags: RD=1
0x00, 0x01, // QDCOUNT=1
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // AN=0, NS=0, AR=0
0x00, // root name (.)
0x00, 0x02, // type NS
0x00, 0x01, // class IN
];
let _ = forward_doh_raw(wire, url, client, Duration::from_secs(5)).await;
}
} }
#[cfg(test)] #[cfg(test)]
@@ -556,10 +657,19 @@ mod tests {
); );
let srtt = RwLock::new(SrttCache::new(true)); let srtt = RwLock::new(SrttCache::new(true));
let result = forward_with_failover(&query, &pool, &srtt, Duration::from_millis(500)) let wire = to_wire(&query);
.await let resp_wire = forward_with_failover_raw(
.expect("should fail over to second upstream"); &wire,
&pool,
&srtt,
Duration::from_millis(500),
Duration::ZERO,
)
.await
.expect("should fail over to second upstream");
let mut buf = BytePacketBuffer::from_bytes(&resp_wire);
let result = DnsPacket::from_buffer(&mut buf).unwrap();
assert_eq!(result.header.id, 0xABCD); assert_eq!(result.header.id, 0xABCD);
assert_eq!(result.answers.len(), 1); assert_eq!(result.answers.len(), 1);
} }

View File

@@ -26,6 +26,7 @@ pub mod srtt;
pub mod stats; pub mod stats;
pub mod system_dns; pub mod system_dns;
pub mod tls; pub mod tls;
pub mod wire;
pub type Error = Box<dyn std::error::Error + Send + Sync>; pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -285,6 +285,7 @@ async fn main() -> numa::Result<()> {
config.cache.min_ttl, config.cache.min_ttl,
config.cache.max_ttl, config.cache.max_ttl,
)), )),
refreshing: Mutex::new(std::collections::HashSet::new()),
stats: Mutex::new(ServerStats::new()), stats: Mutex::new(ServerStats::new()),
overrides: RwLock::new(OverrideStore::new()), overrides: RwLock::new(OverrideStore::new()),
blocklist: RwLock::new(blocklist), blocklist: RwLock::new(blocklist),
@@ -297,6 +298,7 @@ async fn main() -> numa::Result<()> {
upstream_port: config.upstream.port, upstream_port: config.upstream.port,
lan_ip: Mutex::new(numa::lan::detect_lan_ip().unwrap_or(std::net::Ipv4Addr::LOCALHOST)), lan_ip: Mutex::new(numa::lan::detect_lan_ip().unwrap_or(std::net::Ipv4Addr::LOCALHOST)),
timeout: Duration::from_millis(config.upstream.timeout_ms), timeout: Duration::from_millis(config.upstream.timeout_ms),
hedge_delay: Duration::from_millis(config.upstream.hedge_ms),
proxy_tld_suffix: if config.proxy.tld.is_empty() { proxy_tld_suffix: if config.proxy.tld.is_empty() {
String::new() String::new()
} else { } else {
@@ -511,6 +513,14 @@ async fn main() -> numa::Result<()> {
}); });
} }
// Spawn DoH connection keepalive — prevents idle TLS teardown
{
let keepalive_ctx = Arc::clone(&ctx);
tokio::spawn(async move {
doh_keepalive_loop(keepalive_ctx).await;
});
}
// Spawn HTTP API server // Spawn HTTP API server
let api_ctx = Arc::clone(&ctx); let api_ctx = Arc::clone(&ctx);
let api_addr: SocketAddr = format!("{}:{}", config.server.api_bind_addr, api_port).parse()?; let api_addr: SocketAddr = format!("{}:{}", config.server.api_bind_addr, api_port).parse()?;
@@ -590,7 +600,7 @@ async fn main() -> numa::Result<()> {
#[allow(clippy::infinite_loop)] #[allow(clippy::infinite_loop)]
loop { loop {
let mut buffer = BytePacketBuffer::new(); let mut buffer = BytePacketBuffer::new();
let (_, src_addr) = match ctx.socket.recv_from(&mut buffer.buf).await { let (len, src_addr) = match ctx.socket.recv_from(&mut buffer.buf).await {
Ok(r) => r, Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => { Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {
// Windows delivers ICMP port-unreachable as ConnectionReset on UDP sockets // Windows delivers ICMP port-unreachable as ConnectionReset on UDP sockets
@@ -598,10 +608,9 @@ async fn main() -> numa::Result<()> {
} }
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
}; };
let ctx = Arc::clone(&ctx); let ctx = Arc::clone(&ctx);
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = handle_query(buffer, src_addr, &ctx).await { if let Err(e) = handle_query(buffer, len, src_addr, &ctx).await {
error!("{} | HANDLER ERROR | {}", src_addr, e); error!("{} | HANDLER ERROR | {}", src_addr, e);
} }
}); });
@@ -749,30 +758,22 @@ async fn load_blocklists(ctx: &ServerCtx, lists: &[String]) {
} }
async fn warm_domain(ctx: &ServerCtx, domain: &str) { async fn warm_domain(ctx: &ServerCtx, domain: &str) {
use numa::question::QueryType; for qtype in [
numa::question::QueryType::A,
numa::question::QueryType::AAAA,
] {
numa::ctx::refresh_entry(ctx, domain, qtype).await;
}
}
for qtype in [QueryType::A, QueryType::AAAA] { async fn doh_keepalive_loop(ctx: Arc<ServerCtx>) {
let query = numa::packet::DnsPacket::query(0, domain, qtype); let mut interval = tokio::time::interval(Duration::from_secs(25));
let result = if ctx.upstream_mode == numa::config::UpstreamMode::Recursive { interval.tick().await; // skip first immediate tick
numa::recursive::resolve_recursive( loop {
domain, interval.tick().await;
qtype, let pool = ctx.upstream_pool.lock().unwrap().clone();
&ctx.cache, if let Some(upstream) = pool.preferred() {
&query, numa::forward::keepalive_doh(upstream).await;
&ctx.root_hints,
&ctx.srtt,
)
.await
} else {
let pool = ctx.upstream_pool.lock().unwrap().clone();
numa::forward::forward_with_failover(&query, &pool, &ctx.srtt, ctx.timeout).await
};
match result {
Ok(resp) => {
ctx.cache.write().unwrap().insert(domain, qtype, &resp);
log::debug!("cache warm: {} {:?}", domain, qtype);
}
Err(e) => log::warn!("cache warm: {} {:?} failed: {}", domain, qtype, e),
} }
} }
} }

View File

@@ -15,8 +15,8 @@ use crate::srtt::SrttCache;
const MAX_REFERRAL_DEPTH: u8 = 10; const MAX_REFERRAL_DEPTH: u8 = 10;
const MAX_CNAME_DEPTH: u8 = 8; const MAX_CNAME_DEPTH: u8 = 8;
const NS_QUERY_TIMEOUT: Duration = Duration::from_millis(800); const NS_QUERY_TIMEOUT: Duration = Duration::from_millis(400);
const TCP_TIMEOUT: Duration = Duration::from_millis(1500); const TCP_TIMEOUT: Duration = Duration::from_millis(400);
const UDP_FAIL_THRESHOLD: u8 = 3; const UDP_FAIL_THRESHOLD: u8 = 3;
static QUERY_ID: AtomicU16 = AtomicU16::new(1); static QUERY_ID: AtomicU16 = AtomicU16::new(1);
@@ -202,23 +202,24 @@ pub(crate) fn resolve_iterative<'a>(
let mut ns_idx = 0; let mut ns_idx = 0;
for _ in 0..MAX_REFERRAL_DEPTH { for _ in 0..MAX_REFERRAL_DEPTH {
let ns_addr = match ns_addrs.get(ns_idx) { if ns_idx >= ns_addrs.len() {
Some(addr) => *addr, return Err("no nameserver available".into());
None => return Err("no nameserver available".into()), }
};
let (q_name, q_type) = minimize_query(qname, qtype, &current_zone); let (q_name, q_type) = minimize_query(qname, qtype, &current_zone);
debug!( debug!(
"recursive: querying {} for {:?} {} (zone: {}, depth {})", "recursive: querying {} (+ hedge) for {:?} {} (zone: {}, depth {})",
ns_addr, q_type, q_name, current_zone, referral_depth ns_addrs[ns_idx], q_type, q_name, current_zone, referral_depth
); );
let response = match send_query(q_name, q_type, ns_addr, srtt).await { let response = match send_query_hedged(q_name, q_type, &ns_addrs[ns_idx..], srtt).await
{
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
debug!("recursive: NS {} failed: {}", ns_addr, e); debug!("recursive: NS query failed: {}", e);
ns_idx += 1; let remaining = ns_addrs.len().saturating_sub(ns_idx);
ns_idx += remaining.min(2);
continue; continue;
} }
}; };
@@ -228,6 +229,9 @@ pub(crate) fn resolve_iterative<'a>(
{ {
if let Some(zone) = referral_zone(&response) { if let Some(zone) = referral_zone(&response) {
current_zone = zone; current_zone = zone;
let mut cache_w = cache.write().unwrap();
cache_ns_delegation(&mut cache_w, &current_zone, &response);
drop(cache_w);
} }
let mut all_ns = extract_ns_from_records(&response.answers); let mut all_ns = extract_ns_from_records(&response.answers);
if all_ns.is_empty() { if all_ns.is_empty() {
@@ -296,6 +300,7 @@ pub(crate) fn resolve_iterative<'a>(
{ {
let mut cache_w = cache.write().unwrap(); let mut cache_w = cache.write().unwrap();
cache_ns_delegation(&mut cache_w, &current_zone, &response);
cache_ds_from_authority(&mut cache_w, &response); cache_ds_from_authority(&mut cache_w, &response);
} }
let mut new_ns_addrs = resolve_ns_addrs_from_glue(&response, &ns_names, cache); let mut new_ns_addrs = resolve_ns_addrs_from_glue(&response, &ns_names, cache);
@@ -560,6 +565,23 @@ fn cache_ds_from_authority(cache: &mut DnsCache, response: &DnsPacket) {
} }
} }
/// Cache NS delegation records from a referral response so that
/// `find_closest_ns` can skip re-querying TLD servers on subsequent lookups.
fn cache_ns_delegation(cache: &mut DnsCache, zone: &str, response: &DnsPacket) {
let ns_records: Vec<_> = response
.authorities
.iter()
.filter(|r| matches!(r, DnsRecord::NS { .. }))
.cloned()
.collect();
if ns_records.is_empty() {
return;
}
let mut pkt = make_glue_packet();
pkt.answers = ns_records;
cache.insert(zone, QueryType::NS, &pkt);
}
fn make_glue_packet() -> DnsPacket { fn make_glue_packet() -> DnsPacket {
let mut pkt = DnsPacket::new(); let mut pkt = DnsPacket::new();
pkt.header.response = true; pkt.header.response = true;
@@ -587,6 +609,115 @@ async fn tcp_with_srtt(
} }
} }
/// Smart NS query: fire to two servers simultaneously when SRTT is unknown
/// (cold queries), or to the best server with SRTT-based hedge when known.
async fn send_query_hedged(
qname: &str,
qtype: QueryType,
servers: &[SocketAddr],
srtt: &RwLock<SrttCache>,
) -> crate::Result<DnsPacket> {
if servers.is_empty() {
return Err("no nameserver available".into());
}
if servers.len() == 1 {
return send_query(qname, qtype, servers[0], srtt).await;
}
let primary = servers[0];
let secondary = servers[1];
let primary_known = srtt.read().unwrap().is_known(primary.ip());
if !primary_known {
// Cold: fire both simultaneously, first response wins
debug!(
"recursive: parallel query to {} and {} for {:?} {}",
primary, secondary, qtype, qname
);
let fut_a = send_query(qname, qtype, primary, srtt);
let fut_b = send_query(qname, qtype, secondary, srtt);
tokio::pin!(fut_a);
tokio::pin!(fut_b);
// First Ok wins. If one errors, wait for the other.
let mut a_done = false;
let mut b_done = false;
let mut a_err: Option<crate::Error> = None;
let mut b_err: Option<crate::Error> = None;
loop {
tokio::select! {
r = &mut fut_a, if !a_done => {
match r {
Ok(resp) => return Ok(resp),
Err(e) => { a_done = true; a_err = Some(e); }
}
}
r = &mut fut_b, if !b_done => {
match r {
Ok(resp) => return Ok(resp),
Err(e) => { b_done = true; b_err = Some(e); }
}
}
}
match (a_err.take(), b_err.take()) {
(Some(e), Some(_)) => return Err(e),
(a, b) => {
a_err = a;
b_err = b;
}
}
}
} else {
// Warm: send to best, hedge after SRTT × 3 if slow
let hedge_ms = srtt.read().unwrap().get(primary.ip()) * 3;
let hedge_delay = Duration::from_millis(hedge_ms.max(50));
let fut_a = send_query(qname, qtype, primary, srtt);
tokio::pin!(fut_a);
let delay = tokio::time::sleep(hedge_delay);
tokio::pin!(delay);
tokio::select! {
r = &mut fut_a => return r,
_ = &mut delay => {}
}
debug!(
"recursive: hedging {} -> {} after {}ms for {:?} {}",
primary, secondary, hedge_ms, qtype, qname
);
let fut_b = send_query(qname, qtype, secondary, srtt);
tokio::pin!(fut_b);
// First Ok wins; if one errors, wait for the other.
let mut a_err: Option<crate::Error> = None;
let mut b_err: Option<crate::Error> = None;
loop {
tokio::select! {
r = &mut fut_a, if a_err.is_none() => {
match r {
Ok(resp) => return Ok(resp),
Err(e) => {
if b_err.is_some() { return Err(e); }
a_err = Some(e);
}
}
}
r = &mut fut_b, if b_err.is_none() => {
match r {
Ok(resp) => return Ok(resp),
Err(e) => {
if let Some(ae) = a_err.take() { return Err(ae); }
b_err = Some(e);
}
}
}
}
}
}
}
async fn send_query( async fn send_query(
qname: &str, qname: &str,
qtype: QueryType, qtype: QueryType,
@@ -634,9 +765,13 @@ async fn send_query(
"send_query: {} consecutive UDP failures — switching to TCP-first", "send_query: {} consecutive UDP failures — switching to TCP-first",
fails fails
); );
// Now that UDP is disabled, retry this query via TCP
return tcp_with_srtt(&query, server, srtt, start).await;
} }
debug!("send_query: UDP failed for {}: {}, trying TCP", server, e); // UDP works in general (priming succeeded) but this server timed out.
tcp_with_srtt(&query, server, srtt, start).await // Don't waste another 400ms on TCP — the server is unreachable.
srtt.write().unwrap().record_failure(server.ip());
Err(e)
} }
} }
} }
@@ -678,6 +813,10 @@ mod tests {
use super::*; use super::*;
use std::net::{Ipv4Addr, Ipv6Addr}; use std::net::{Ipv4Addr, Ipv6Addr};
/// Tests that mutate the global UDP_DISABLED / UDP_FAILURES flags must hold
/// this lock to avoid racing with each other under `cargo test` parallelism.
static UDP_STATE_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test] #[test]
fn extract_ns_from_authority() { fn extract_ns_from_authority() {
let mut pkt = DnsPacket::new(); let mut pkt = DnsPacket::new();
@@ -916,10 +1055,11 @@ mod tests {
} }
/// TCP-only server returns authoritative answer directly. /// TCP-only server returns authoritative answer directly.
/// Verifies: UDP fails → TCP fallback → resolves. /// Verifies: when UDP is disabled, TCP-first resolves.
#[tokio::test] #[tokio::test]
async fn tcp_fallback_resolves_when_udp_blocked() { async fn tcp_fallback_resolves_when_udp_blocked() {
UDP_DISABLED.store(false, Ordering::Relaxed); let _guard = UDP_STATE_LOCK.lock().unwrap();
UDP_DISABLED.store(true, Ordering::Relaxed);
UDP_FAILURES.store(0, Ordering::Release); UDP_FAILURES.store(0, Ordering::Release);
let server_addr = spawn_tcp_dns_server(|query| { let server_addr = spawn_tcp_dns_server(|query| {
@@ -950,49 +1090,32 @@ mod tests {
} }
} }
/// Full iterative resolution through TCP-only mock: root referral → authoritative answer. /// TCP round-trip through mock: query → authoritative answer via forward_tcp.
/// The mock plays both roles (returns referral for NS queries, answer for A queries). /// Uses forward_tcp directly to avoid dependence on the global UDP_DISABLED flag
/// which is shared across concurrent tests.
#[tokio::test] #[tokio::test]
async fn tcp_only_iterative_resolution() { async fn tcp_only_iterative_resolution() {
UDP_DISABLED.store(true, Ordering::Release); // Skip UDP entirely for speed
let server_addr = spawn_tcp_dns_server(|query| { let server_addr = spawn_tcp_dns_server(|query| {
let q = match query.questions.first() { let q = match query.questions.first() {
Some(q) => q, Some(q) => q,
None => return DnsPacket::response_from(query, ResultCode::SERVFAIL), None => return DnsPacket::response_from(query, ResultCode::SERVFAIL),
}; };
if q.qtype == QueryType::NS || q.name == "com" { let mut resp = DnsPacket::response_from(query, ResultCode::NOERROR);
// Return referral — NS points back to ourselves (same IP, port 53 in glue resp.header.authoritative_answer = true;
// won't work, but cache will have our address from root_hints) resp.answers.push(DnsRecord::A {
let mut resp = DnsPacket::new(); domain: q.name.clone(),
resp.header.id = query.header.id; addr: Ipv4Addr::new(10, 0, 0, 42),
resp.header.response = true; ttl: 300,
resp.header.rescode = ResultCode::NOERROR; });
resp.questions = query.questions.clone(); resp
resp.authorities.push(DnsRecord::NS {
domain: "com".into(),
host: "ns1.com".into(),
ttl: 3600,
});
resp
} else {
// Return authoritative answer
let mut resp = DnsPacket::response_from(query, ResultCode::NOERROR);
resp.header.authoritative_answer = true;
resp.answers.push(DnsRecord::A {
domain: q.name.clone(),
addr: Ipv4Addr::new(10, 0, 0, 42),
ttl: 300,
});
resp
}
}) })
.await; .await;
let srtt = RwLock::new(SrttCache::new(true)); let query = DnsPacket::query(0x1234, "hello.example.com", QueryType::A);
let result = send_query("hello.example.com", QueryType::A, server_addr, &srtt).await; let resp = crate::forward::forward_tcp(&query, server_addr, TCP_TIMEOUT)
let resp = result.expect("TCP-only send_query should work"); .await
.expect("TCP query should work");
assert_eq!(resp.header.rescode, ResultCode::NOERROR); assert_eq!(resp.header.rescode, ResultCode::NOERROR);
match &resp.answers[0] { match &resp.answers[0] {
DnsRecord::A { addr, .. } => assert_eq!(*addr, Ipv4Addr::new(10, 0, 0, 42)), DnsRecord::A { addr, .. } => assert_eq!(*addr, Ipv4Addr::new(10, 0, 0, 42)),
@@ -1002,7 +1125,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn tcp_fallback_handles_nxdomain() { async fn tcp_fallback_handles_nxdomain() {
UDP_DISABLED.store(false, Ordering::Relaxed); let _guard = UDP_STATE_LOCK.lock().unwrap();
UDP_DISABLED.store(true, Ordering::Relaxed);
UDP_FAILURES.store(0, Ordering::Release); UDP_FAILURES.store(0, Ordering::Release);
let server_addr = spawn_tcp_dns_server(|query| { let server_addr = spawn_tcp_dns_server(|query| {
@@ -1034,6 +1158,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn udp_auto_disable_resets() { async fn udp_auto_disable_resets() {
let _guard = UDP_STATE_LOCK.lock().unwrap();
UDP_DISABLED.store(true, Ordering::Release); UDP_DISABLED.store(true, Ordering::Release);
UDP_FAILURES.store(5, Ordering::Relaxed); UDP_FAILURES.store(5, Ordering::Relaxed);

View File

@@ -45,6 +45,11 @@ impl SrttCache {
} }
} }
/// Whether we have observed RTT data for this IP.
pub fn is_known(&self, ip: IpAddr) -> bool {
self.entries.contains_key(&ip)
}
/// Apply time-based decay: each DECAY_AFTER_SECS period halves distance to INITIAL. /// Apply time-based decay: each DECAY_AFTER_SECS period halves distance to INITIAL.
fn decayed_srtt(entry: &SrttEntry) -> u64 { fn decayed_srtt(entry: &SrttEntry) -> u64 {
Self::decay_for_age(entry.srtt_ms, entry.updated_at.elapsed().as_secs()) Self::decay_for_age(entry.srtt_ms, entry.updated_at.elapsed().as_secs())

1416
src/wire.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -53,7 +53,17 @@ CONF
echo "Starting Numa on :$PORT ($SUITE_NAME)..." echo "Starting Numa on :$PORT ($SUITE_NAME)..."
RUST_LOG=info "$BINARY" "$CONFIG" > "$LOG" 2>&1 & RUST_LOG=info "$BINARY" "$CONFIG" > "$LOG" 2>&1 &
NUMA_PID=$! NUMA_PID=$!
sleep 4 sleep 2
# Wait for blocklist to load (if blocking is enabled in this suite)
if echo "$SUITE_CONFIG" | grep -q 'enabled = true'; then
for i in $(seq 1 20); do
LOADED=$(curl -sf http://127.0.0.1:$API_PORT/blocking/stats 2>/dev/null \
| grep -o '"domains_loaded":[0-9]*' | cut -d: -f2)
if [ "${LOADED:-0}" -gt 0 ]; then break; fi
sleep 1
done
fi
if ! kill -0 "$NUMA_PID" 2>/dev/null; then if ! kill -0 "$NUMA_PID" 2>/dev/null; then
echo "Failed to start Numa:" echo "Failed to start Numa:"