add LAN service discovery via UDP multicast
Numa instances on the same network auto-discover each other's .numa services. No config, no cloud — just multicast on 239.255.70.78:5390. - PeerStore with lazy expiry (90s timeout, 30s broadcast interval) - DNS resolves remote .numa services to peer's LAN IP (not localhost) - Proxy forwards to peer IP for remote services - Graceful degradation if multicast bind fails Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -25,6 +25,8 @@ pub struct Config {
|
||||
pub proxy: ProxyConfig,
|
||||
#[serde(default)]
|
||||
pub services: Vec<ServiceConfig>,
|
||||
#[serde(default)]
|
||||
pub lan: LanConfig,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -202,6 +204,48 @@ pub struct ServiceConfig {
|
||||
pub target_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone)]
|
||||
pub struct LanConfig {
|
||||
#[serde(default = "default_lan_enabled")]
|
||||
pub enabled: bool,
|
||||
#[serde(default = "default_lan_multicast_group")]
|
||||
pub multicast_group: String,
|
||||
#[serde(default = "default_lan_port")]
|
||||
pub port: u16,
|
||||
#[serde(default = "default_lan_broadcast_interval")]
|
||||
pub broadcast_interval_secs: u64,
|
||||
#[serde(default = "default_lan_peer_timeout")]
|
||||
pub peer_timeout_secs: u64,
|
||||
}
|
||||
|
||||
impl Default for LanConfig {
|
||||
fn default() -> Self {
|
||||
LanConfig {
|
||||
enabled: default_lan_enabled(),
|
||||
multicast_group: default_lan_multicast_group(),
|
||||
port: default_lan_port(),
|
||||
broadcast_interval_secs: default_lan_broadcast_interval(),
|
||||
peer_timeout_secs: default_lan_peer_timeout(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_lan_enabled() -> bool {
|
||||
true
|
||||
}
|
||||
fn default_lan_multicast_group() -> String {
|
||||
"239.255.70.78".to_string()
|
||||
}
|
||||
fn default_lan_port() -> u16 {
|
||||
5390
|
||||
}
|
||||
fn default_lan_broadcast_interval() -> u64 {
|
||||
30
|
||||
}
|
||||
fn default_lan_peer_timeout() -> u64 {
|
||||
90
|
||||
}
|
||||
|
||||
pub fn load_config(path: &str) -> Result<Config> {
|
||||
if !Path::new(path).exists() {
|
||||
return Ok(Config::default());
|
||||
|
||||
29
src/ctx.rs
29
src/ctx.rs
@@ -16,6 +16,7 @@ use crate::packet::DnsPacket;
|
||||
use crate::query_log::{QueryLog, QueryLogEntry};
|
||||
use crate::question::QueryType;
|
||||
use crate::record::DnsRecord;
|
||||
use crate::lan::PeerStore;
|
||||
use crate::service_store::ServiceStore;
|
||||
use crate::stats::{QueryPath, ServerStats};
|
||||
use crate::system_dns::ForwardingRule;
|
||||
@@ -29,6 +30,7 @@ pub struct ServerCtx {
|
||||
pub blocklist: Mutex<BlocklistStore>,
|
||||
pub query_log: Mutex<QueryLog>,
|
||||
pub services: Mutex<ServiceStore>,
|
||||
pub lan_peers: Mutex<PeerStore>,
|
||||
pub forwarding_rules: Vec<ForwardingRule>,
|
||||
pub upstream: SocketAddr,
|
||||
pub timeout: Duration,
|
||||
@@ -67,16 +69,39 @@ pub async fn handle_query(
|
||||
} else if !ctx.proxy_tld_suffix.is_empty()
|
||||
&& (qname.ends_with(&ctx.proxy_tld_suffix) || qname == ctx.proxy_tld)
|
||||
{
|
||||
// Resolve .numa: local services → 127.0.0.1, LAN peers → peer IP
|
||||
let service_name = qname
|
||||
.strip_suffix(&ctx.proxy_tld_suffix)
|
||||
.unwrap_or(&qname);
|
||||
let resolve_ip = {
|
||||
let local = ctx.services.lock().unwrap();
|
||||
if local.lookup(service_name).is_some() {
|
||||
std::net::Ipv4Addr::LOCALHOST
|
||||
} else {
|
||||
let mut peers = ctx.lan_peers.lock().unwrap();
|
||||
peers
|
||||
.lookup(service_name)
|
||||
.and_then(|(ip, _)| match ip {
|
||||
std::net::IpAddr::V4(v4) => Some(v4),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap_or(std::net::Ipv4Addr::LOCALHOST)
|
||||
}
|
||||
};
|
||||
let mut resp = DnsPacket::response_from(&query, ResultCode::NOERROR);
|
||||
match qtype {
|
||||
QueryType::AAAA => resp.answers.push(DnsRecord::AAAA {
|
||||
domain: qname.clone(),
|
||||
addr: std::net::Ipv6Addr::LOCALHOST,
|
||||
addr: if resolve_ip == std::net::Ipv4Addr::LOCALHOST {
|
||||
std::net::Ipv6Addr::LOCALHOST
|
||||
} else {
|
||||
resolve_ip.to_ipv6_mapped()
|
||||
},
|
||||
ttl: 300,
|
||||
}),
|
||||
_ => resp.answers.push(DnsRecord::A {
|
||||
domain: qname.clone(),
|
||||
addr: std::net::Ipv4Addr::LOCALHOST,
|
||||
addr: resolve_ip,
|
||||
ttl: 300,
|
||||
}),
|
||||
}
|
||||
|
||||
198
src/lan.rs
Normal file
198
src/lan.rs
Normal file
@@ -0,0 +1,198 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use log::{debug, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::LanConfig;
|
||||
use crate::ctx::ServerCtx;
|
||||
|
||||
// --- Peer Store ---
|
||||
|
||||
pub struct PeerStore {
|
||||
peers: HashMap<String, (IpAddr, u16, Instant)>,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl PeerStore {
|
||||
pub fn new(timeout_secs: u64) -> Self {
|
||||
PeerStore {
|
||||
peers: HashMap::new(),
|
||||
timeout: Duration::from_secs(timeout_secs),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update(&mut self, host: IpAddr, services: &[(String, u16)]) {
|
||||
let now = Instant::now();
|
||||
for (name, port) in services {
|
||||
self.peers
|
||||
.insert(name.to_lowercase(), (host, *port, now));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn lookup(&mut self, name: &str) -> Option<(IpAddr, u16)> {
|
||||
let key = name.to_lowercase();
|
||||
let entry = self.peers.get(&key)?;
|
||||
if entry.2.elapsed() > self.timeout {
|
||||
self.peers.remove(&key);
|
||||
return None;
|
||||
}
|
||||
Some((entry.0, entry.1))
|
||||
}
|
||||
|
||||
pub fn list(&mut self) -> Vec<(String, IpAddr, u16, u64)> {
|
||||
let now = Instant::now();
|
||||
self.peers.retain(|_, (_, _, seen)| now.duration_since(*seen) < self.timeout);
|
||||
self.peers
|
||||
.iter()
|
||||
.map(|(name, (ip, port, seen))| {
|
||||
(name.clone(), *ip, *port, now.duration_since(*seen).as_secs())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
// --- Multicast ---
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct Announcement {
|
||||
host: String,
|
||||
services: Vec<AnnouncedService>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AnnouncedService {
|
||||
name: String,
|
||||
port: u16,
|
||||
}
|
||||
|
||||
pub fn detect_lan_ip() -> Option<Ipv4Addr> {
|
||||
let socket = std::net::UdpSocket::bind("0.0.0.0:0").ok()?;
|
||||
socket.connect("8.8.8.8:80").ok()?;
|
||||
match socket.local_addr().ok()? {
|
||||
SocketAddr::V4(addr) => Some(*addr.ip()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_lan_discovery(ctx: Arc<ServerCtx>, config: &LanConfig) {
|
||||
let multicast_group: Ipv4Addr = match config.multicast_group.parse() {
|
||||
Ok(g) => g,
|
||||
Err(e) => {
|
||||
warn!("LAN: invalid multicast group {}: {}", config.multicast_group, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let port = config.port;
|
||||
let interval = Duration::from_secs(config.broadcast_interval_secs);
|
||||
|
||||
let local_ip = detect_lan_ip().unwrap_or(Ipv4Addr::LOCALHOST);
|
||||
info!("LAN discovery on {}:{}, local IP {}", multicast_group, port, local_ip);
|
||||
|
||||
// Create socket with SO_REUSEADDR for multicast
|
||||
let std_socket = match create_multicast_socket(multicast_group, port) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!("LAN: could not bind multicast socket: {} — LAN discovery disabled", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let socket = match tokio::net::UdpSocket::from_std(std_socket) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!("LAN: tokio socket conversion failed: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let socket = Arc::new(socket);
|
||||
|
||||
// Spawn sender
|
||||
let sender_ctx = Arc::clone(&ctx);
|
||||
let sender_socket = Arc::clone(&socket);
|
||||
let local_ip_str = local_ip.to_string();
|
||||
let self_filter = local_ip_str.clone();
|
||||
let dest = SocketAddr::new(IpAddr::V4(multicast_group), port);
|
||||
tokio::spawn(async move {
|
||||
let mut ticker = tokio::time::interval(interval);
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
let services: Vec<AnnouncedService> = {
|
||||
let store = sender_ctx.services.lock().unwrap();
|
||||
store
|
||||
.list()
|
||||
.iter()
|
||||
.map(|e| AnnouncedService {
|
||||
name: e.name.clone(),
|
||||
port: e.target_port,
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
if services.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let announcement = Announcement {
|
||||
host: local_ip_str.clone(),
|
||||
services,
|
||||
};
|
||||
if let Ok(json) = serde_json::to_vec(&announcement) {
|
||||
let _ = sender_socket.send_to(&json, dest).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Receiver loop
|
||||
let mut buf = vec![0u8; 4096];
|
||||
loop {
|
||||
let (len, src) = match socket.recv_from(&mut buf).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
debug!("LAN recv error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let announcement: Announcement = match serde_json::from_slice(&buf[..len]) {
|
||||
Ok(a) => a,
|
||||
Err(_) => continue,
|
||||
};
|
||||
// Skip self-announcements
|
||||
if announcement.host == self_filter {
|
||||
continue;
|
||||
}
|
||||
let peer_ip: IpAddr = match announcement.host.parse() {
|
||||
Ok(ip) => ip,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let services: Vec<(String, u16)> = announcement
|
||||
.services
|
||||
.iter()
|
||||
.map(|s| (s.name.clone(), s.port))
|
||||
.collect();
|
||||
let count = services.len();
|
||||
ctx.lan_peers.lock().unwrap().update(peer_ip, &services);
|
||||
debug!(
|
||||
"LAN: {} services from {} (via {})",
|
||||
count, announcement.host, src
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn create_multicast_socket(
|
||||
group: Ipv4Addr,
|
||||
port: u16,
|
||||
) -> std::io::Result<std::net::UdpSocket> {
|
||||
use std::net::SocketAddrV4;
|
||||
|
||||
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port);
|
||||
let socket = socket2::Socket::new(
|
||||
socket2::Domain::IPV4,
|
||||
socket2::Type::DGRAM,
|
||||
Some(socket2::Protocol::UDP),
|
||||
)?;
|
||||
socket.set_reuse_address(true)?;
|
||||
socket.set_nonblocking(true)?;
|
||||
socket.bind(&socket2::SockAddr::from(addr))?;
|
||||
socket.join_multicast_v4(&group, &Ipv4Addr::UNSPECIFIED)?;
|
||||
Ok(socket.into())
|
||||
}
|
||||
@@ -6,6 +6,7 @@ pub mod config;
|
||||
pub mod ctx;
|
||||
pub mod forward;
|
||||
pub mod header;
|
||||
pub mod lan;
|
||||
pub mod override_store;
|
||||
pub mod packet;
|
||||
pub mod proxy;
|
||||
|
||||
14
src/main.rs
14
src/main.rs
@@ -127,6 +127,7 @@ async fn main() -> numa::Result<()> {
|
||||
blocklist: Mutex::new(blocklist),
|
||||
query_log: Mutex::new(QueryLog::new(1000)),
|
||||
services: Mutex::new(service_store),
|
||||
lan_peers: Mutex::new(numa::lan::PeerStore::new(config.lan.peer_timeout_secs)),
|
||||
forwarding_rules,
|
||||
upstream,
|
||||
timeout: Duration::from_millis(config.upstream.timeout_ms),
|
||||
@@ -161,6 +162,10 @@ async fn main() -> numa::Result<()> {
|
||||
};
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mProxy\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m", schemes);
|
||||
}
|
||||
if config.lan.enabled {
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mLAN\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m",
|
||||
format!("{}:{}", config.lan.multicast_group, config.lan.port));
|
||||
}
|
||||
if !ctx.forwarding_rules.is_empty() {
|
||||
eprintln!("\x1b[38;2;192;98;58m ║\x1b[0m \x1b[38;2;107;124;78mRouting\x1b[0m {:<30}\x1b[38;2;192;98;58m║\x1b[0m",
|
||||
format!("{} conditional rules", ctx.forwarding_rules.len()));
|
||||
@@ -235,6 +240,15 @@ async fn main() -> numa::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn LAN service discovery
|
||||
if config.lan.enabled {
|
||||
let lan_ctx = Arc::clone(&ctx);
|
||||
let lan_config = config.lan.clone();
|
||||
tokio::spawn(async move {
|
||||
numa::lan::start_lan_discovery(lan_ctx, &lan_config).await;
|
||||
});
|
||||
}
|
||||
|
||||
// UDP DNS listener
|
||||
#[allow(clippy::infinite_loop)]
|
||||
loop {
|
||||
|
||||
15
src/proxy.rs
15
src/proxy.rs
@@ -135,11 +135,15 @@ async fn proxy_handler(State(state): State<ProxyState>, req: Request) -> axum::r
|
||||
}
|
||||
};
|
||||
|
||||
let target_port = {
|
||||
let (target_host, target_port) = {
|
||||
let store = state.ctx.services.lock().unwrap();
|
||||
match store.lookup(&service_name) {
|
||||
Some(entry) => entry.target_port,
|
||||
None => {
|
||||
if let Some(entry) = store.lookup(&service_name) {
|
||||
("localhost".to_string(), entry.target_port)
|
||||
} else {
|
||||
let mut peers = state.ctx.lan_peers.lock().unwrap();
|
||||
match peers.lookup(&service_name) {
|
||||
Some((ip, port)) => (ip.to_string(), port),
|
||||
None => {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
[(hyper::header::CONTENT_TYPE, "text/html; charset=utf-8")],
|
||||
@@ -259,6 +263,7 @@ pre .str {{ color: #d48a5a }}
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -268,7 +273,7 @@ pre .str {{ color: #d48a5a }}
|
||||
.path_and_query()
|
||||
.map(|pq| pq.as_str())
|
||||
.unwrap_or("/");
|
||||
let target_uri: hyper::Uri = format!("http://localhost:{}{}", target_port, path_and_query)
|
||||
let target_uri: hyper::Uri = format!("http://{}:{}{}", target_host, target_port, path_and_query)
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user