feat: add DNS-over-TLS (DoT) listener #25
60
src/dot.rs
60
src/dot.rs
@@ -18,6 +18,7 @@ use crate::packet::DnsPacket;
|
||||
|
||||
const MAX_CONNECTIONS: usize = 512;
|
||||
const IDLE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
// Matches BytePacketBuffer::BUF_SIZE — RFC 7858 allows up to 65535 but our
|
||||
// buffer would silently truncate anything larger.
|
||||
const MAX_MSG_LEN: usize = 4096;
|
||||
@@ -91,7 +92,10 @@ pub async fn start_dot(ctx: Arc<ServerCtx>, config: &DotConfig) {
|
||||
};
|
||||
info!("DoT listening on {}", addr);
|
||||
|
||||
let acceptor = TlsAcceptor::from(tls_config);
|
||||
accept_loop(listener, TlsAcceptor::from(tls_config), ctx).await;
|
||||
}
|
||||
|
||||
async fn accept_loop(listener: TcpListener, acceptor: TlsAcceptor, ctx: Arc<ServerCtx>) {
|
||||
let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
|
||||
|
||||
loop {
|
||||
@@ -116,12 +120,17 @@ pub async fn start_dot(ctx: Arc<ServerCtx>, config: &DotConfig) {
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit; // held until task exits
|
||||
|
||||
let tls_stream = match acceptor.accept(tcp_stream).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
let tls_stream =
|
||||
match tokio::time::timeout(HANDSHAKE_TIMEOUT, acceptor.accept(tcp_stream)).await {
|
||||
Ok(Ok(s)) => s,
|
||||
Ok(Err(e)) => {
|
||||
debug!("DoT: TLS handshake failed from {}: {}", remote_addr, e);
|
||||
return;
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("DoT: TLS handshake timeout from {}", remote_addr);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
handle_dot_connection(tls_stream, remote_addr, &ctx).await;
|
||||
@@ -152,18 +161,19 @@ where
|
||||
break;
|
||||
}
|
||||
|
||||
let mut data = vec![0u8; msg_len];
|
||||
if stream.read_exact(&mut data).await.is_err() {
|
||||
let mut buffer = BytePacketBuffer::new();
|
||||
match tokio::time::timeout(IDLE_TIMEOUT, stream.read_exact(&mut buffer.buf[..msg_len]))
|
||||
.await
|
||||
{
|
||||
Ok(Ok(_)) => {}
|
||||
Ok(Err(_)) => break,
|
||||
Err(_) => {
|
||||
debug!("DoT: payload read timeout from {}", remote_addr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Extract query ID before resolve_query consumes the buffer
|
||||
let query_id = data
|
||||
.get(..2)
|
||||
.map(|b| u16::from_be_bytes([b[0], b[1]]))
|
||||
.unwrap_or(0);
|
||||
|
||||
let buffer = BytePacketBuffer::from_bytes(&data);
|
||||
let query_id = u16::from_be_bytes([buffer.buf[0], buffer.buf[1]]);
|
||||
let resp_buffer = match resolve_query(buffer, remote_addr, ctx).await {
|
||||
Ok(buf) => buf,
|
||||
Err(e) => {
|
||||
@@ -296,30 +306,8 @@ mod tests {
|
||||
|
||||
let tls_config = Arc::clone(&*ctx.tls_config.as_ref().unwrap().load());
|
||||
let acceptor = TlsAcceptor::from(tls_config);
|
||||
let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (tcp_stream, remote_addr) = match listener.accept().await {
|
||||
Ok(conn) => conn,
|
||||
Err(_) => return,
|
||||
};
|
||||
let permit = match semaphore.clone().try_acquire_owned() {
|
||||
Ok(p) => p,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let acceptor = acceptor.clone();
|
||||
let ctx = Arc::clone(&ctx);
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit;
|
||||
let tls_stream = match acceptor.accept(tcp_stream).await {
|
||||
Ok(s) => s,
|
||||
Err(_) => return,
|
||||
};
|
||||
handle_dot_connection(tls_stream, remote_addr, &ctx).await;
|
||||
});
|
||||
}
|
||||
});
|
||||
tokio::spawn(accept_loop(listener, acceptor, ctx));
|
||||
|
||||
(addr, client_tls)
|
||||
}
|
||||
|
||||
@@ -870,14 +870,25 @@ mod tests {
|
||||
};
|
||||
let handler = handler.clone();
|
||||
tokio::spawn(async move {
|
||||
let timeout = std::time::Duration::from_secs(5);
|
||||
// Read length-prefixed DNS query
|
||||
let mut len_buf = [0u8; 2];
|
||||
if stream.read_exact(&mut len_buf).await.is_err() {
|
||||
if tokio::time::timeout(timeout, stream.read_exact(&mut len_buf))
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|r| r.ok())
|
||||
.is_none()
|
||||
{
|
||||
return;
|
||||
}
|
||||
let len = u16::from_be_bytes(len_buf) as usize;
|
||||
let mut data = vec![0u8; len];
|
||||
if stream.read_exact(&mut data).await.is_err() {
|
||||
if tokio::time::timeout(timeout, stream.read_exact(&mut data))
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|r| r.ok())
|
||||
.is_none()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user