add ad blocking, live dashboard, system DNS auto-discovery
- DNS-level ad blocking: 385K+ domains via Hagezi Pro blocklist, subdomain matching, one-click allowlist, pause/toggle, background refresh every 24h - Live dashboard at :5380 with real-time stats, query log, override management (create/edit/delete), blocking controls - System DNS auto-discovery: parses scutil --dns on macOS to find conditional forwarding rules (Tailscale, VPN split-DNS) - REST API expanded to 18 endpoints (blocking, overrides, diagnostics) - Startup banner with colored system info - Performance benchmarks (bench/dns-bench.sh) - Landing page updated with new positioning and comparison table - CI, Dockerfile, LICENSE, development plan docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
215
src/main.rs
215
src/main.rs
@@ -1,47 +1,48 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Duration;
|
||||
|
||||
use log::{debug, error, info, warn};
|
||||
use log::{error, info};
|
||||
use tokio::net::UdpSocket;
|
||||
|
||||
use dns_fun::buffer::BytePacketBuffer;
|
||||
use dns_fun::cache::DnsCache;
|
||||
use dns_fun::config::{build_zone_map, load_config};
|
||||
use dns_fun::forward::forward_query;
|
||||
use dns_fun::header::ResultCode;
|
||||
use dns_fun::packet::DnsPacket;
|
||||
use dns_fun::question::QueryType;
|
||||
use dns_fun::record::DnsRecord;
|
||||
use dns_fun::stats::{QueryPath, ServerStats};
|
||||
|
||||
struct ServerCtx {
|
||||
socket: Arc<UdpSocket>,
|
||||
zone_map: HashMap<(String, QueryType), Vec<DnsRecord>>,
|
||||
cache: Mutex<DnsCache>,
|
||||
stats: Mutex<ServerStats>,
|
||||
upstream: SocketAddr,
|
||||
timeout: Duration,
|
||||
}
|
||||
use numa::blocklist::{download_blocklists, parse_blocklist, BlocklistStore};
|
||||
use numa::buffer::BytePacketBuffer;
|
||||
use numa::cache::DnsCache;
|
||||
use numa::config::{build_zone_map, load_config};
|
||||
use numa::ctx::{handle_query, ServerCtx};
|
||||
use numa::override_store::OverrideStore;
|
||||
use numa::query_log::QueryLog;
|
||||
use numa::stats::ServerStats;
|
||||
use numa::system_dns::discover_forwarding_rules;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> dns_fun::Result<()> {
|
||||
async fn main() -> numa::Result<()> {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
|
||||
.format_timestamp_millis()
|
||||
.init();
|
||||
|
||||
let config_path = std::env::args()
|
||||
.nth(1)
|
||||
.unwrap_or_else(|| "dns_fun.toml".to_string());
|
||||
.unwrap_or_else(|| "numa.toml".to_string());
|
||||
let config = load_config(&config_path)?;
|
||||
|
||||
let upstream: SocketAddr =
|
||||
format!("{}:{}", config.upstream.address, config.upstream.port).parse()?;
|
||||
let socket = Arc::new(UdpSocket::bind(&config.server.bind_addr).await?);
|
||||
let api_port = config.server.api_port;
|
||||
|
||||
let mut blocklist = BlocklistStore::new();
|
||||
for domain in &config.blocking.allowlist {
|
||||
blocklist.add_to_allowlist(domain);
|
||||
}
|
||||
if !config.blocking.enabled {
|
||||
blocklist.set_enabled(false);
|
||||
}
|
||||
|
||||
// Auto-discover conditional forwarding rules from OS (Tailscale, VPN, etc.)
|
||||
let forwarding_rules = discover_forwarding_rules();
|
||||
|
||||
let ctx = Arc::new(ServerCtx {
|
||||
socket: Arc::clone(&socket),
|
||||
socket: UdpSocket::bind(&config.server.bind_addr).await?,
|
||||
zone_map: build_zone_map(&config.zones)?,
|
||||
cache: Mutex::new(DnsCache::new(
|
||||
config.cache.max_entries,
|
||||
@@ -49,21 +50,72 @@ async fn main() -> dns_fun::Result<()> {
|
||||
config.cache.max_ttl,
|
||||
)),
|
||||
stats: Mutex::new(ServerStats::new()),
|
||||
overrides: Mutex::new(OverrideStore::new()),
|
||||
blocklist: Mutex::new(blocklist),
|
||||
query_log: Mutex::new(QueryLog::new(1000)),
|
||||
forwarding_rules,
|
||||
upstream,
|
||||
timeout: Duration::from_millis(config.upstream.timeout_ms),
|
||||
});
|
||||
|
||||
let zone_count: usize = ctx.zone_map.values().map(|m| m.len()).sum();
|
||||
eprintln!("\n\x1b[38;2;192;98;58m ╔══════════════════════════════════════════╗\x1b[0m");
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[1;38;2;192;98;58mNUMA\x1b[0m \x1b[3;38;2;163;152;136mDNS that governs itself\x1b[0m \x1b[38;2;192;98;58m║\x1b[0m");
|
||||
eprintln!("\x1b[38;2;192;98;58m ╠══════════════════════════════════════════╣\x1b[0m");
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mDNS\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m", config.server.bind_addr);
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mAPI\x1b[0m http://localhost:{:<16}\x1b[38;2;192;98;58m║\x1b[0m", api_port);
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mDashboard\x1b[0m http://localhost:{:<16}\x1b[38;2;192;98;58m║\x1b[0m", api_port);
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mUpstream\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m", upstream);
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mZones\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m", format!("{} records", zone_count));
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mCache\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m", format!("max {} entries", config.cache.max_entries));
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mBlocking\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m",
|
||||
if config.blocking.enabled { format!("{} lists", config.blocking.lists.len()) } else { "disabled".to_string() });
|
||||
if !ctx.forwarding_rules.is_empty() {
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mRouting\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m",
|
||||
format!("{} conditional rules", ctx.forwarding_rules.len()));
|
||||
}
|
||||
eprintln!("\x1b[38;2;192;98;58m ╚══════════════════════════════════════════╝\x1b[0m\n");
|
||||
|
||||
info!(
|
||||
"dns_fun starting on {}, upstream {}, {} zone records, cache max {}",
|
||||
config.server.bind_addr,
|
||||
upstream,
|
||||
ctx.zone_map.len(),
|
||||
config.cache.max_entries,
|
||||
"numa listening on {}, upstream {}, {} zone records, cache max {}, API on port {}",
|
||||
config.server.bind_addr, upstream, zone_count, config.cache.max_entries, api_port,
|
||||
);
|
||||
|
||||
// Download blocklists on startup
|
||||
let blocklist_lists = config.blocking.lists.clone();
|
||||
let refresh_hours = config.blocking.refresh_hours;
|
||||
if config.blocking.enabled && !blocklist_lists.is_empty() {
|
||||
let bl_ctx = Arc::clone(&ctx);
|
||||
let bl_lists = blocklist_lists.clone();
|
||||
tokio::spawn(async move {
|
||||
load_blocklists(&bl_ctx, &bl_lists).await;
|
||||
|
||||
// Periodic refresh
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(refresh_hours * 3600));
|
||||
interval.tick().await; // skip immediate tick
|
||||
loop {
|
||||
interval.tick().await;
|
||||
info!("refreshing blocklists...");
|
||||
load_blocklists(&bl_ctx, &bl_lists).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Spawn HTTP API server
|
||||
let api_ctx = Arc::clone(&ctx);
|
||||
let api_addr: SocketAddr = format!("0.0.0.0:{}", api_port).parse()?;
|
||||
tokio::spawn(async move {
|
||||
let app = numa::api::router(api_ctx);
|
||||
let listener = tokio::net::TcpListener::bind(api_addr).await.unwrap();
|
||||
info!("HTTP API listening on {}", api_addr);
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
|
||||
// UDP DNS listener
|
||||
#[allow(clippy::infinite_loop)]
|
||||
loop {
|
||||
let mut buffer = BytePacketBuffer::new();
|
||||
let (_, src_addr) = socket.recv_from(&mut buffer.buf).await?;
|
||||
let (_, src_addr) = ctx.socket.recv_from(&mut buffer.buf).await?;
|
||||
|
||||
let ctx = Arc::clone(&ctx);
|
||||
tokio::spawn(async move {
|
||||
@@ -74,87 +126,28 @@ async fn main() -> dns_fun::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
mut buffer: BytePacketBuffer,
|
||||
src_addr: SocketAddr,
|
||||
ctx: &ServerCtx,
|
||||
) -> dns_fun::Result<()> {
|
||||
let start = Instant::now();
|
||||
async fn load_blocklists(ctx: &ServerCtx, lists: &[String]) {
|
||||
let downloaded = download_blocklists(lists).await;
|
||||
|
||||
let query = match DnsPacket::from_buffer(&mut buffer) {
|
||||
Ok(packet) => packet,
|
||||
Err(e) => {
|
||||
warn!("{} | PARSE ERROR | {}", src_addr, e);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let (qname, qtype) = match query.questions.first() {
|
||||
Some(q) => (q.name.clone(), q.qtype),
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
// Pipeline: local zones -> cache -> upstream
|
||||
// Each lock is scoped to avoid holding MutexGuard across await points.
|
||||
let (response, path) = if let Some(records) = ctx.zone_map.get(&(qname.to_lowercase(), qtype)) {
|
||||
let mut resp = DnsPacket::response_from(&query, ResultCode::NOERROR);
|
||||
resp.answers = records.clone();
|
||||
(resp, QueryPath::Local)
|
||||
} else {
|
||||
let cached = ctx.cache.lock().unwrap().lookup(&qname, qtype);
|
||||
if let Some(cached) = cached {
|
||||
let mut resp = cached;
|
||||
resp.header.id = query.header.id;
|
||||
(resp, QueryPath::Cached)
|
||||
} else {
|
||||
match forward_query(&query, ctx.upstream, ctx.timeout).await {
|
||||
Ok(resp) => {
|
||||
ctx.cache.lock().unwrap().insert(&qname, qtype, &resp);
|
||||
(resp, QueryPath::Forwarded)
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"{} | {:?} {} | UPSTREAM ERROR | {}",
|
||||
src_addr, qtype, qname, e
|
||||
);
|
||||
(
|
||||
DnsPacket::response_from(&query, ResultCode::SERVFAIL),
|
||||
QueryPath::UpstreamError,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
info!(
|
||||
"{} | {:?} {} | {} | {} | {}ms",
|
||||
src_addr,
|
||||
qtype,
|
||||
qname,
|
||||
path.as_str(),
|
||||
response.header.rescode.as_str(),
|
||||
elapsed.as_millis(),
|
||||
);
|
||||
|
||||
debug!(
|
||||
"response: {} answers, {} authorities, {} resources",
|
||||
response.answers.len(),
|
||||
response.authorities.len(),
|
||||
response.resources.len(),
|
||||
);
|
||||
|
||||
let mut resp_buffer = BytePacketBuffer::new();
|
||||
response.write(&mut resp_buffer)?;
|
||||
ctx.socket.send_to(resp_buffer.filled(), src_addr).await?;
|
||||
|
||||
// Record stats and log summary every 1000 queries (single lock acquisition)
|
||||
let mut s = ctx.stats.lock().unwrap();
|
||||
let total = s.record(path);
|
||||
if total.is_multiple_of(1000) {
|
||||
s.log_summary();
|
||||
// Parse outside the lock to avoid blocking DNS queries during parse (~100ms)
|
||||
let mut all_domains = std::collections::HashSet::new();
|
||||
let mut sources = Vec::new();
|
||||
for (source, text) in &downloaded {
|
||||
let domains = parse_blocklist(text);
|
||||
info!("blocklist: {} domains from {}", domains.len(), source);
|
||||
all_domains.extend(domains);
|
||||
sources.push(source.clone());
|
||||
}
|
||||
let total = all_domains.len();
|
||||
|
||||
Ok(())
|
||||
// Swap under lock — sub-microsecond
|
||||
ctx.blocklist
|
||||
.lock()
|
||||
.unwrap()
|
||||
.swap_domains(all_domains, sources);
|
||||
info!(
|
||||
"blocking enabled: {} unique domains from {} lists",
|
||||
total,
|
||||
downloaded.len()
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user