feat: accept array of upstreams in [[forwarding]]

Mirrors `[upstream] address` — `upstream` accepts string or array
of strings, builds an `UpstreamPool` and routes queries through
`forward_with_failover_raw` so SRTT ordering and failover apply to
matched `[[forwarding]]` rules the same way they do for the default
pool.

Single-string rules keep their current behavior (one-element pool,
equivalent single-upstream path). Empty array errors at config load.

Addresses item 1 of issue #102. Plan: docs/102_item1.md.
This commit is contained in:
Razvan Dimescu
2026-04-15 04:03:38 +03:00
parent 120ba5200e
commit 9a0d586b13
6 changed files with 172 additions and 53 deletions

View File

@@ -66,6 +66,13 @@ api_port = 5380
# [[forwarding]] # DoH upstream: full https:// URL # [[forwarding]] # DoH upstream: full https:// URL
# suffix = "example.corp" # suffix = "example.corp"
# upstream = "https://dns.quad9.net/dns-query" # upstream = "https://dns.quad9.net/dns-query"
#
# [[forwarding]] # array of upstreams → SRTT-aware failover
# suffix = ["google.com", "goog"] # fastest-healthy first, dead one skipped
# upstream = [
# "tls://9.9.9.9#dns.quad9.net",
# "tls://149.112.112.112#dns.quad9.net",
# ]
# [blocking] # [blocking]
# enabled = true # set to false to disable ad blocking # enabled = true # set to false to disable ad blocking

View File

@@ -41,17 +41,30 @@ pub struct Config {
pub struct ForwardingRuleConfig { pub struct ForwardingRuleConfig {
#[serde(deserialize_with = "string_or_vec")] #[serde(deserialize_with = "string_or_vec")]
pub suffix: Vec<String>, pub suffix: Vec<String>,
pub upstream: String, #[serde(deserialize_with = "string_or_vec")]
pub upstream: Vec<String>,
} }
impl ForwardingRuleConfig { impl ForwardingRuleConfig {
fn to_runtime_rules(&self) -> Result<Vec<crate::system_dns::ForwardingRule>> { fn to_runtime_rules(&self) -> Result<Vec<crate::system_dns::ForwardingRule>> {
let upstream = crate::forward::parse_upstream(&self.upstream, 53) if self.upstream.is_empty() {
.map_err(|e| format!("forwarding rule for upstream '{}': {}", self.upstream, e))?; return Err(format!(
"forwarding rule for suffix {:?}: upstream must not be empty",
self.suffix
)
.into());
}
let mut primary = Vec::with_capacity(self.upstream.len());
for s in &self.upstream {
let u = crate::forward::parse_upstream(s, 53)
.map_err(|e| format!("forwarding rule for upstream '{}': {}", s, e))?;
primary.push(u);
}
let pool = crate::forward::UpstreamPool::new(primary, vec![]);
Ok(self Ok(self
.suffix .suffix
.iter() .iter()
.map(|s| crate::system_dns::ForwardingRule::new(s.clone(), upstream.clone())) .map(|s| crate::system_dns::ForwardingRule::new(s.clone(), pool.clone()))
.collect()) .collect())
} }
} }
@@ -643,7 +656,7 @@ mod tests {
let config: Config = toml::from_str(toml).unwrap(); let config: Config = toml::from_str(toml).unwrap();
assert_eq!(config.forwarding.len(), 1); assert_eq!(config.forwarding.len(), 1);
assert_eq!(config.forwarding[0].suffix, &["home.local"]); assert_eq!(config.forwarding[0].suffix, &["home.local"]);
assert_eq!(config.forwarding[0].upstream, "100.90.1.63:5361"); assert_eq!(config.forwarding[0].upstream, vec!["100.90.1.63:5361"]);
} }
#[test] #[test]
@@ -671,7 +684,7 @@ mod tests {
"#; "#;
let config: Config = toml::from_str(toml).unwrap(); let config: Config = toml::from_str(toml).unwrap();
assert_eq!(config.forwarding.len(), 2); assert_eq!(config.forwarding.len(), 2);
assert_eq!(config.forwarding[1].upstream, "10.0.0.1"); assert_eq!(config.forwarding[1].upstream, vec!["10.0.0.1"]);
} }
#[test] #[test]
@@ -693,28 +706,29 @@ mod tests {
fn forwarding_suffix_array_expands_to_multiple_runtime_rules() { fn forwarding_suffix_array_expands_to_multiple_runtime_rules() {
let rule = ForwardingRuleConfig { let rule = ForwardingRuleConfig {
suffix: vec!["168.192.in-addr.arpa".to_string(), "onsite".to_string()], suffix: vec!["168.192.in-addr.arpa".to_string(), "onsite".to_string()],
upstream: "192.168.88.1".to_string(), upstream: vec!["192.168.88.1".to_string()],
}; };
let runtime = rule.to_runtime_rules().unwrap(); let runtime = rule.to_runtime_rules().unwrap();
assert_eq!(runtime.len(), 2); assert_eq!(runtime.len(), 2);
assert_eq!(runtime[0].suffix, "168.192.in-addr.arpa"); assert_eq!(runtime[0].suffix, "168.192.in-addr.arpa");
assert_eq!(runtime[1].suffix, "onsite"); assert_eq!(runtime[1].suffix, "onsite");
assert_eq!(runtime[0].upstream, runtime[1].upstream); assert_eq!(
runtime[0].upstream.preferred(),
runtime[1].upstream.preferred()
);
} }
#[test] #[test]
fn forwarding_upstream_with_explicit_port() { fn forwarding_upstream_with_explicit_port() {
let rule = ForwardingRuleConfig { let rule = ForwardingRuleConfig {
suffix: vec!["home.local".to_string()], suffix: vec!["home.local".to_string()],
upstream: "100.90.1.63:5361".to_string(), upstream: vec!["100.90.1.63:5361".to_string()],
}; };
let runtime = rule.to_runtime_rules().unwrap(); let runtime = rule.to_runtime_rules().unwrap();
assert_eq!(runtime.len(), 1); assert_eq!(runtime.len(), 1);
assert!(matches!( let preferred = runtime[0].upstream.preferred().unwrap();
runtime[0].upstream, assert!(matches!(preferred, crate::forward::Upstream::Udp(_)));
crate::forward::Upstream::Udp(_) assert_eq!(preferred.to_string(), "100.90.1.63:5361");
));
assert_eq!(runtime[0].upstream.to_string(), "100.90.1.63:5361");
assert_eq!(runtime[0].suffix, "home.local"); assert_eq!(runtime[0].suffix, "home.local");
} }
@@ -722,17 +736,20 @@ mod tests {
fn forwarding_upstream_defaults_to_port_53() { fn forwarding_upstream_defaults_to_port_53() {
let rule = ForwardingRuleConfig { let rule = ForwardingRuleConfig {
suffix: vec!["home.local".to_string()], suffix: vec!["home.local".to_string()],
upstream: "100.90.1.63".to_string(), upstream: vec!["100.90.1.63".to_string()],
}; };
let runtime = rule.to_runtime_rules().unwrap(); let runtime = rule.to_runtime_rules().unwrap();
assert_eq!(runtime[0].upstream.to_string(), "100.90.1.63:53"); assert_eq!(
runtime[0].upstream.preferred().unwrap().to_string(),
"100.90.1.63:53"
);
} }
#[test] #[test]
fn forwarding_invalid_upstream_returns_error() { fn forwarding_invalid_upstream_returns_error() {
let rule = ForwardingRuleConfig { let rule = ForwardingRuleConfig {
suffix: vec!["home.local".to_string()], suffix: vec!["home.local".to_string()],
upstream: "not-a-valid-host".to_string(), upstream: vec!["not-a-valid-host".to_string()],
}; };
assert!(rule.to_runtime_rules().is_err()); assert!(rule.to_runtime_rules().is_err());
} }
@@ -741,14 +758,14 @@ mod tests {
fn forwarding_upstream_accepts_dot_scheme() { fn forwarding_upstream_accepts_dot_scheme() {
let rule = ForwardingRuleConfig { let rule = ForwardingRuleConfig {
suffix: vec!["google.com".to_string()], suffix: vec!["google.com".to_string()],
upstream: "tls://9.9.9.9#dns.quad9.net".to_string(), upstream: vec!["tls://9.9.9.9#dns.quad9.net".to_string()],
}; };
let runtime = rule let runtime = rule
.to_runtime_rules() .to_runtime_rules()
.expect("tls:// upstream should parse"); .expect("tls:// upstream should parse");
assert_eq!(runtime.len(), 1); assert_eq!(runtime.len(), 1);
assert_eq!( assert_eq!(
runtime[0].upstream.to_string(), runtime[0].upstream.preferred().unwrap().to_string(),
"tls://9.9.9.9:853#dns.quad9.net" "tls://9.9.9.9:853#dns.quad9.net"
); );
} }
@@ -757,14 +774,14 @@ mod tests {
fn forwarding_upstream_accepts_doh_scheme() { fn forwarding_upstream_accepts_doh_scheme() {
let rule = ForwardingRuleConfig { let rule = ForwardingRuleConfig {
suffix: vec!["goog".to_string()], suffix: vec!["goog".to_string()],
upstream: "https://dns.quad9.net/dns-query".to_string(), upstream: vec!["https://dns.quad9.net/dns-query".to_string()],
}; };
let runtime = rule let runtime = rule
.to_runtime_rules() .to_runtime_rules()
.expect("https:// upstream should parse"); .expect("https:// upstream should parse");
assert_eq!(runtime.len(), 1); assert_eq!(runtime.len(), 1);
assert_eq!( assert_eq!(
runtime[0].upstream.to_string(), runtime[0].upstream.preferred().unwrap().to_string(),
"https://dns.quad9.net/dns-query" "https://dns.quad9.net/dns-query"
); );
} }
@@ -773,44 +790,90 @@ mod tests {
fn forwarding_config_rules_take_precedence_over_discovered() { fn forwarding_config_rules_take_precedence_over_discovered() {
let config_rules = vec![ForwardingRuleConfig { let config_rules = vec![ForwardingRuleConfig {
suffix: vec!["home.local".to_string()], suffix: vec!["home.local".to_string()],
upstream: "10.0.0.1:53".to_string(), upstream: vec!["10.0.0.1:53".to_string()],
}]; }];
let discovered = vec![crate::system_dns::ForwardingRule::new( let discovered = vec![crate::system_dns::ForwardingRule::new(
"home.local".to_string(), "home.local".to_string(),
crate::forward::Upstream::Udp("192.168.1.1:53".parse().unwrap()), crate::forward::UpstreamPool::new(
vec![crate::forward::Upstream::Udp(
"192.168.1.1:53".parse().unwrap(),
)],
vec![],
),
)]; )];
let merged = merge_forwarding_rules(&config_rules, discovered).unwrap(); let merged = merge_forwarding_rules(&config_rules, discovered).unwrap();
let picked = crate::system_dns::match_forwarding_rule("host.home.local", &merged) let picked = crate::system_dns::match_forwarding_rule("host.home.local", &merged)
.expect("rule should match"); .expect("rule should match");
assert_eq!(picked.to_string(), "10.0.0.1:53"); assert_eq!(picked.preferred().unwrap().to_string(), "10.0.0.1:53");
} }
#[test] #[test]
fn forwarding_merge_preserves_non_overlapping_discovered() { fn forwarding_merge_preserves_non_overlapping_discovered() {
let config_rules = vec![ForwardingRuleConfig { let config_rules = vec![ForwardingRuleConfig {
suffix: vec!["home.local".to_string()], suffix: vec!["home.local".to_string()],
upstream: "10.0.0.1:53".to_string(), upstream: vec!["10.0.0.1:53".to_string()],
}]; }];
let discovered = vec![crate::system_dns::ForwardingRule::new( let discovered = vec![crate::system_dns::ForwardingRule::new(
"corp.example".to_string(), "corp.example".to_string(),
crate::forward::Upstream::Udp("192.168.1.1:53".parse().unwrap()), crate::forward::UpstreamPool::new(
vec![crate::forward::Upstream::Udp(
"192.168.1.1:53".parse().unwrap(),
)],
vec![],
),
)]; )];
let merged = merge_forwarding_rules(&config_rules, discovered).unwrap(); let merged = merge_forwarding_rules(&config_rules, discovered).unwrap();
assert_eq!(merged.len(), 2); assert_eq!(merged.len(), 2);
let picked = crate::system_dns::match_forwarding_rule("host.corp.example", &merged) let picked = crate::system_dns::match_forwarding_rule("host.corp.example", &merged)
.expect("discovered rule should still match"); .expect("discovered rule should still match");
assert_eq!(picked.to_string(), "192.168.1.1:53"); assert_eq!(picked.preferred().unwrap().to_string(), "192.168.1.1:53");
} }
#[test] #[test]
fn forwarding_merge_suffix_array_expands_to_multiple_rules() { fn forwarding_merge_suffix_array_expands_to_multiple_rules() {
let config_rules = vec![ForwardingRuleConfig { let config_rules = vec![ForwardingRuleConfig {
suffix: vec!["a.local".to_string(), "b.local".to_string()], suffix: vec!["a.local".to_string(), "b.local".to_string()],
upstream: "10.0.0.1:53".to_string(), upstream: vec!["10.0.0.1:53".to_string()],
}]; }];
let merged = merge_forwarding_rules(&config_rules, vec![]).unwrap(); let merged = merge_forwarding_rules(&config_rules, vec![]).unwrap();
assert_eq!(merged.len(), 2); assert_eq!(merged.len(), 2);
} }
#[test]
fn forwarding_parses_upstream_array() {
let toml = r#"
[[forwarding]]
suffix = "google.com"
upstream = ["tls://9.9.9.9#dns.quad9.net", "tls://149.112.112.112#dns.quad9.net"]
"#;
let config: Config = toml::from_str(toml).unwrap();
assert_eq!(config.forwarding.len(), 1);
assert_eq!(config.forwarding[0].upstream.len(), 2);
}
#[test]
fn forwarding_upstream_array_builds_pool_with_multiple_primaries() {
let rule = ForwardingRuleConfig {
suffix: vec!["google.com".to_string()],
upstream: vec![
"tls://9.9.9.9#dns.quad9.net".to_string(),
"tls://149.112.112.112#dns.quad9.net".to_string(),
],
};
let runtime = rule.to_runtime_rules().unwrap();
assert_eq!(runtime.len(), 1);
let label = runtime[0].upstream.label();
assert!(label.contains("+1 more"), "label was: {}", label);
}
#[test]
fn forwarding_empty_upstream_array_errors() {
let rule = ForwardingRuleConfig {
suffix: vec!["home.local".to_string()],
upstream: vec![],
};
assert!(rule.to_runtime_rules().is_err());
}
} }
pub struct ConfigLoad { pub struct ConfigLoad {

View File

@@ -16,7 +16,9 @@ use crate::blocklist::BlocklistStore;
use crate::buffer::BytePacketBuffer; use crate::buffer::BytePacketBuffer;
use crate::cache::{DnsCache, DnssecStatus}; use crate::cache::{DnsCache, DnssecStatus};
use crate::config::{UpstreamMode, ZoneMap}; use crate::config::{UpstreamMode, ZoneMap};
use crate::forward::{forward_query_raw, forward_with_failover_raw, Upstream, UpstreamPool}; use crate::forward::{forward_with_failover_raw, UpstreamPool};
#[cfg(test)]
use crate::forward::Upstream;
use crate::header::ResultCode; use crate::header::ResultCode;
use crate::health::HealthMeta; use crate::health::HealthMeta;
use crate::lan::PeerStore; use crate::lan::PeerStore;
@@ -190,13 +192,31 @@ pub async fn resolve_query(
resp.header.authed_data = true; resp.header.authed_data = true;
} }
(resp, QueryPath::Cached, cached_dnssec) (resp, QueryPath::Cached, cached_dnssec)
} else if let Some(upstream) = } else if let Some(pool) =
crate::system_dns::match_forwarding_rule(&qname, &ctx.forwarding_rules) crate::system_dns::match_forwarding_rule(&qname, &ctx.forwarding_rules)
{ {
// Conditional forwarding takes priority over recursive mode // Conditional forwarding takes priority over recursive mode
// (e.g. Tailscale .ts.net, VPC private zones) // (e.g. Tailscale .ts.net, VPC private zones)
match forward_and_cache(raw_wire, upstream, ctx, &qname, qtype).await { match forward_with_failover_raw(
raw_wire,
pool,
&ctx.srtt,
ctx.timeout,
ctx.hedge_delay,
)
.await
{
Ok(resp_wire) => match cache_and_parse(ctx, &qname, qtype, &resp_wire) {
Ok(resp) => (resp, QueryPath::Forwarded, DnssecStatus::Indeterminate), Ok(resp) => (resp, QueryPath::Forwarded, DnssecStatus::Indeterminate),
Err(e) => {
error!("{} | {:?} {} | PARSE ERROR | {}", src_addr, qtype, qname, e);
(
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
QueryPath::UpstreamError,
DnssecStatus::Indeterminate,
)
}
},
Err(e) => { Err(e) => {
error!( error!(
"{} | {:?} {} | FORWARD ERROR | {}", "{} | {:?} {} | FORWARD ERROR | {}",
@@ -433,17 +453,6 @@ pub async fn refresh_entry(ctx: &ServerCtx, qname: &str, qtype: QueryType) {
} }
} }
async fn forward_and_cache(
wire: &[u8],
upstream: &Upstream,
ctx: &ServerCtx,
qname: &str,
qtype: QueryType,
) -> crate::Result<DnsPacket> {
let resp_wire = forward_query_raw(wire, upstream, ctx.timeout).await?;
cache_and_parse(ctx, qname, qtype, &resp_wire)
}
pub async fn handle_query( pub async fn handle_query(
mut buffer: BytePacketBuffer, mut buffer: BytePacketBuffer,
raw_len: usize, raw_len: usize,
@@ -1082,7 +1091,7 @@ mod tests {
let mut ctx = crate::testutil::test_ctx().await; let mut ctx = crate::testutil::test_ctx().await;
ctx.forwarding_rules = vec![ForwardingRule::new( ctx.forwarding_rules = vec![ForwardingRule::new(
"168.192.in-addr.arpa".to_string(), "168.192.in-addr.arpa".to_string(),
Upstream::Udp(upstream_addr), UpstreamPool::new(vec![Upstream::Udp(upstream_addr)], vec![]),
)]; )];
let ctx = Arc::new(ctx); let ctx = Arc::new(ctx);
@@ -1237,7 +1246,7 @@ mod tests {
let mut ctx = crate::testutil::test_ctx().await; let mut ctx = crate::testutil::test_ctx().await;
ctx.forwarding_rules = vec![ForwardingRule::new( ctx.forwarding_rules = vec![ForwardingRule::new(
"corp".to_string(), "corp".to_string(),
Upstream::Udp(upstream_addr), UpstreamPool::new(vec![Upstream::Udp(upstream_addr)], vec![]),
)]; )];
let ctx = Arc::new(ctx); let ctx = Arc::new(ctx);
@@ -1253,4 +1262,38 @@ mod tests {
other => panic!("expected A record, got {:?}", other), other => panic!("expected A record, got {:?}", other),
} }
} }
#[tokio::test]
async fn pipeline_forwarding_fails_over_to_second_upstream() {
let dead = crate::testutil::blackhole_upstream();
let mut live_resp = DnsPacket::new();
live_resp.header.response = true;
live_resp.header.rescode = ResultCode::NOERROR;
live_resp.answers.push(DnsRecord::A {
domain: "internal.corp".to_string(),
addr: Ipv4Addr::new(10, 9, 9, 9),
ttl: 600,
});
let live = crate::testutil::mock_upstream(live_resp).await;
let mut ctx = crate::testutil::test_ctx().await;
ctx.forwarding_rules = vec![ForwardingRule::new(
"corp".to_string(),
UpstreamPool::new(
vec![Upstream::Udp(dead), Upstream::Udp(live)],
vec![],
),
)];
let ctx = Arc::new(ctx);
let (resp, path) = resolve_in_test(&ctx, "internal.corp", QueryType::A).await;
assert_eq!(path, QueryPath::Forwarded);
assert_eq!(resp.header.rescode, ResultCode::NOERROR);
assert_eq!(resp.answers.len(), 1);
match &resp.answers[0] {
DnsRecord::A { addr, .. } => assert_eq!(*addr, Ipv4Addr::new(10, 9, 9, 9)),
other => panic!("expected A record, got {:?}", other),
}
}
} }

View File

@@ -118,7 +118,7 @@ fn build_dot_connector() -> Result<tokio_rustls::TlsConnector> {
))) )))
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct UpstreamPool { pub struct UpstreamPool {
primary: Vec<Upstream>, primary: Vec<Upstream>,
fallback: Vec<Upstream>, fallback: Vec<Upstream>,

View File

@@ -212,7 +212,11 @@ async fn main() -> numa::Result<()> {
for fwd in &config.forwarding { for fwd in &config.forwarding {
for suffix in &fwd.suffix { for suffix in &fwd.suffix {
info!("forwarding .{} to {} (config rule)", suffix, fwd.upstream); info!(
"forwarding .{} to {} (config rule)",
suffix,
fwd.upstream.join(", ")
);
} }
} }
let forwarding_rules = let forwarding_rules =

View File

@@ -2,7 +2,7 @@ use std::net::SocketAddr;
use log::info; use log::info;
use crate::forward::Upstream; use crate::forward::{Upstream, UpstreamPool};
fn print_recursive_hint() { fn print_recursive_hint() {
let is_recursive = crate::config::load_config("numa.toml") let is_recursive = crate::config::load_config("numa.toml")
@@ -24,11 +24,11 @@ fn is_loopback_or_stub(addr: &str) -> bool {
pub struct ForwardingRule { pub struct ForwardingRule {
pub suffix: String, pub suffix: String,
dot_suffix: String, // pre-computed ".suffix" for zero-alloc matching dot_suffix: String, // pre-computed ".suffix" for zero-alloc matching
pub upstream: Upstream, pub upstream: UpstreamPool,
} }
impl ForwardingRule { impl ForwardingRule {
pub fn new(suffix: String, upstream: Upstream) -> Self { pub fn new(suffix: String, upstream: UpstreamPool) -> Self {
let dot_suffix = format!(".{}", suffix); let dot_suffix = format!(".{}", suffix);
Self { Self {
suffix, suffix,
@@ -216,7 +216,8 @@ fn discover_macos() -> SystemDnsInfo {
for rule in &rules { for rule in &rules {
info!( info!(
"auto-discovered forwarding: *.{} -> {}", "auto-discovered forwarding: *.{} -> {}",
rule.suffix, rule.upstream rule.suffix,
rule.upstream.label()
); );
} }
if rules.is_empty() { if rules.is_empty() {
@@ -235,7 +236,8 @@ fn discover_macos() -> SystemDnsInfo {
#[cfg(any(target_os = "macos", target_os = "linux"))] #[cfg(any(target_os = "macos", target_os = "linux"))]
fn make_rule(domain: &str, nameserver: &str) -> Option<ForwardingRule> { fn make_rule(domain: &str, nameserver: &str) -> Option<ForwardingRule> {
let addr = crate::forward::parse_upstream_addr(nameserver, 53).ok()?; let addr = crate::forward::parse_upstream_addr(nameserver, 53).ok()?;
Some(ForwardingRule::new(domain.to_string(), Upstream::Udp(addr))) let pool = UpstreamPool::new(vec![Upstream::Udp(addr)], vec![]);
Some(ForwardingRule::new(domain.to_string(), pool))
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@@ -827,7 +829,7 @@ fn uninstall_windows() -> Result<(), String> {
pub fn match_forwarding_rule<'a>( pub fn match_forwarding_rule<'a>(
domain: &str, domain: &str,
rules: &'a [ForwardingRule], rules: &'a [ForwardingRule],
) -> Option<&'a Upstream> { ) -> Option<&'a UpstreamPool> {
for rule in rules { for rule in rules {
if domain == rule.suffix || domain.ends_with(&rule.dot_suffix) { if domain == rule.suffix || domain.ends_with(&rule.dot_suffix) {
return Some(&rule.upstream); return Some(&rule.upstream);