32 Commits

Author SHA1 Message Date
Eduard Ghenea
6fc59a7bd5 fix: replace truecolor ANSI codes with 256-color for consistent terminal display
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-17 12:05:22 +03:00
Razvan Dimescu
f9ce82f4b0 Merge pull request #107 from razvandimescu/feat/windows-service
feat(windows): run as a real SCM service, not a Run-key autostart
2026-04-17 02:02:43 +03:00
Razvan Dimescu
1d9495c013 ci: bridge DNS gap with direct upstream instead of polling
systemd-resolved has a ~40s reconfiguration stall after restart
(systemd #22521) that breaks the GHA runner's persistent connection
to results-receiver.actions.githubusercontent.com. Polling for DNS
recovery isn't enough since the .NET runner agent caches DNS at the
connection-pool level. Replace the broken stub-resolv symlink with a
direct upstream so DNS works instantly.
2026-04-17 01:32:36 +03:00
Razvan Dimescu
34b75833b8 ci: poll for DNS recovery in cleanup, not test step
Move DNS recovery wait into the cleanup step (if: always) so it runs
regardless of test outcome. Use getent hosts loop instead of sleep+dig
to match what post-steps actually use for resolution.
2026-04-17 01:11:20 +03:00
Razvan Dimescu
99af97a67b ci: wait for DNS recovery after uninstall on Linux
systemd-resolved needs a moment to restore its stub listener after
the numa drop-in is removed. Without a wait, the runner can't resolve
GitHub's API to report job completion.
2026-04-16 20:20:53 +03:00
Razvan Dimescu
9e56054f37 ci: add integration tests for install/uninstall lifecycle
Release-build + install/verify/re-install/uninstall cycle on Linux and
macOS. Runs after lint/test passes (needs dependency). Cleanup step
uses if: always() to handle cancellation.
2026-04-16 19:56:44 +03:00
Razvan Dimescu
fe9f31616e test: add SCM output parsing and config path regression tests
Extract parse_sc_registered and parse_sc_state as testable pure
functions. 8 new tests covering: service registration detection,
service state parsing, and Windows config_dir == data_dir invariant.
2026-04-16 19:31:26 +03:00
Razvan Dimescu
9f08d8b489 fix(windows): stop service before port probe, wait for full exit
Stop the running service before disabling Dnscache so the port 53 probe
sees the real state (not Numa's own binding). Wait for SCM STOPPED
state before copying the binary to avoid os error 32 (file in use).
2026-04-16 19:21:56 +03:00
Razvan Dimescu
9bea038cb6 fix(windows): unify config/data dir and add service log file
config_dir() on Windows now returns data_dir() (ProgramData) so config,
services.json, and log file are in the same place for both interactive
and service contexts. Service mode writes logs to numa.log via
env_logger pipe. Dashboard shows correct log path per OS.
2026-04-16 19:12:42 +03:00
Razvan Dimescu
f0a1dd7106 fix(dashboard): hide logs path on Windows (no log sink yet) 2026-04-16 19:01:34 +03:00
Razvan Dimescu
6789c321bc fix(windows): defer DNS redirect until port 53 is free
Probe port 53 after disabling Dnscache instead of assuming reboot is
needed. Skip DNS redirect when port is blocked (service does it on
first boot). Fix readiness probe: TCP connect to API port instead of
broken UDP send_to that always succeeded.
2026-04-16 18:35:09 +03:00
Razvan Dimescu
da40a8dbfc ci: fetch full history on Windows so build.rs embeds git SHA 2026-04-16 18:08:48 +03:00
Razvan Dimescu
65e65028a0 fix(windows): separate service lifecycle from install flow
service start/stop/restart/status now map to proper SCM operations
instead of re-running the full install/uninstall flow. On re-install,
stop the running service first so the binary can be overwritten.
2026-04-16 16:59:54 +03:00
Razvan Dimescu
d3eab73a31 fix: use sort_by_key to satisfy clippy unnecessary_sort_by 2026-04-16 16:13:15 +03:00
Razvan Dimescu
22ec684e48 Merge remote-tracking branch 'origin/main' into feat/windows-service
# Conflicts:
#	src/main.rs
2026-04-16 16:06:49 +03:00
Razvan Dimescu
aa040fd8a4 Merge pull request #111 from razvandimescu/fix/allowlist-input-focus
fix(dashboard): allowlist input erased by polling refresh
2026-04-16 15:27:02 +03:00
Razvan Dimescu
b69cc89d38 fix(dashboard): skip allowlist re-render while input has focus
The polling refresh replaced the entire allowlist panel innerHTML every
2 seconds, destroying the input field mid-typing. Users had to
paste-and-enter faster than the refresh interval — #106 reported this
as text "timing out and erasing."

Guard: skip renderAllowlist() when allowDomainInput has focus.
2026-04-16 15:12:00 +03:00
Razvan Dimescu
ebb801650e Merge pull request #110 from razvandimescu/feat/build-version
feat: embed git SHA in version string
2026-04-16 13:41:23 +03:00
Razvan Dimescu
30bb7365c9 refactor: robust git-describe parsing for pre-release tags
Switch to --long flag so format is always TAG-N-gSHA[-dirty], then
split from the right. Handles pre-release tags (v0.14.0-rc1) that
broke the previous left-split approach. Remove ineffective directory
watch on .git/refs/tags/. Trim comments.
2026-04-16 13:18:56 +03:00
Razvan Dimescu
0118ab0f44 feat: embed git SHA in version string via build.rs
Adds a build.rs that runs `git describe --tags --always --dirty` and
sets NUMA_BUILD_VERSION at compile time. A new `numa::version()` helper
returns the build version, falling back to CARGO_PKG_VERSION when git
is unavailable (source tarballs, Docker builds without .git).

Version strings:
  tagged release:      0.13.1
  commits ahead:       0.13.1+a87f907
  uncommitted changes: 0.13.1+a87f907-dirty
  no git:              0.13.1

Replaces all 6 inline env!("CARGO_PKG_VERSION") call sites with the
single version() function.
2026-04-16 13:02:25 +03:00
Razvan Dimescu
a87f907d20 Merge pull request #109 from razvandimescu/feat/dashboard-version
feat(dashboard): version in header, restructure footer
2026-04-16 11:29:54 +03:00
Razvan Dimescu
1c5e703330 fix(dashboard): collapse header on mobile (≤700px)
Hide tagline, version tag, and Phone Setup on narrow viewports so
the header stays single-row: logo + status dot + blocking toggle.
Reduces logo font-size from 1.8rem to 1.4rem on mobile.
2026-04-16 06:39:29 +03:00
Razvan Dimescu
cc635f2f73 feat(dashboard): show version in header, restructure footer
Closes #108.

- Add `version` field to /stats (from CARGO_PKG_VERSION).
- Show `v0.13.1` next to the Numa wordmark in the dashboard header.
- Restructure the footer into two semantic rows:
  Row 1 (paths): Config · Data · Logs (platform-detected)
  Row 2 (runtime): Upstream · DNSSEC · SRTT · GitHub
- Drop Mode from the footer (redundant with Upstream label).
- Show only the matching-platform log path instead of both
  macOS and Linux unconditionally.
2026-04-16 06:15:48 +03:00
Razvan Dimescu
7bb484ada3 refactor(windows): deduplicate after simplify review
- Drop the duplicate WINDOWS_SERVICE_NAME constant; call sites use the
  single source of truth at windows_service::SERVICE_NAME.
- windows_service_exe_path and service_config_path now compose from
  crate::data_dir() instead of re-parsing %PROGRAMDATA% locally.
- Factor the 6× sc.exe invocation boilerplate into a run_sc helper.
- Replace the 200ms try_recv polling loop in the service dispatcher
  with a recv_timeout wait — cuts shutdown latency and idle CPU.
- stop_service_scm/delete_service_scm now log warnings instead of
  silently swallowing failures, so unexpected errors are visible.
2026-04-15 23:48:09 +03:00
Razvan Dimescu
b610160cd1 feat(windows): run numa as a real SCM service, drop Run-key autostart
Hooks the service-dispatcher scaffolding from the previous commit to
actually serve DNS, and replaces the HKLM\…\Run login-time autostart
with a proper Windows service created via sc.exe.

**Refactor**
- Extract main.rs's inline server body (~500 lines) into `numa::serve::run`
  so both the interactive CLI entry and the service dispatcher drive the
  same startup/serve loop. main.rs is now a thin subcommand router.
- main.rs goes sync (no #[tokio::main]); each branch that needs async
  builds its own runtime and block_on's. Required so the --service path
  can hand off to SCM without fighting tokio for the entry thread.

**Windows service wrapper**
- `numa::windows_service::run_service` now builds a multi-thread tokio
  runtime on a dedicated thread and runs `serve::run` inside it. Stop/
  Shutdown from SCM aborts the wait loop and reports SERVICE_STOPPED.
- Config path resolves to `%PROGRAMDATA%\numa\numa.toml` when running
  under SCM (SYSTEM's cwd is System32, relative paths don't work).

**Install/uninstall**
- `install_windows` now copies numa.exe to a stable
  `%PROGRAMDATA%\numa\bin\numa.exe` and registers it via `sc create`
  with start=auto, obj=LocalSystem, and a failure policy of
  restart/5000/restart/5000/restart/10000. Starts the service
  immediately when no reboot is pending.
- `uninstall_windows` stops + deletes the service and removes the
  binary copy before restoring DNS.
- Drops the old `register_autostart` / `remove_autostart` helpers that
  wrote to `HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Run` — that
  path runs at user login in the user's session with no stderr capture
  and no crash-restart policy, which is why we've been flying blind in
  every Windows debug session.

DNS-set bugs (netsh destructive static, IPv6 not touched, uninstall
secondary-drop) and file logging are orthogonal — tracked for follow-up.
2026-04-15 22:24:23 +03:00
Razvan Dimescu
cea4b0ef88 feat(windows): add windows-service crate + SCM dispatcher scaffold
Lets numa.exe act as a real Windows service registered with the SCM,
replacing the HKLM\...\Run login-time autostart that runs in the user
session without stderr capture.

- New `numa::windows_service` module (cfg(windows)) wraps Mullvad's
  `windows-service` crate: registers with SCM, reports Running, handles
  Stop/Shutdown, reports Stopped.
- `numa.exe --service` is the entry point SCM uses
  (`sc create … binPath="numa.exe --service"`); interactive invocations
  are unchanged.
- Dep is gated `[target.'cfg(windows)'.dependencies]` — zero impact on
  macOS/Linux builds or binary size.

Scaffold only. The service currently blocks on an mpsc channel until
Stop arrives; the actual serve loop will hook in once main.rs's inline
server body is extracted into `numa::serve(config_path)` in a follow-up.
This lets `sc start Numa` / `sc stop Numa` be verified end to end today.
2026-04-15 22:14:36 +03:00
Razvan Dimescu
43a5ca4bd5 Merge pull request #105 from razvandimescu/chore/audit-rustls-webpki
chore(deps): bump rustls-webpki to 0.103.12
2026-04-15 14:41:19 +03:00
Razvan Dimescu
b403671e11 chore(deps): bump rustls-webpki to 0.103.12
Patches RUSTSEC-2026-0098 (URI name constraints incorrectly accepted)
and RUSTSEC-2026-0099 (wildcard cert name constraints), both published
2026-04-14. Transitive via reqwest / rustls / hickory / quinn.
2026-04-15 14:27:17 +03:00
Razvan Dimescu
6f0144b237 Merge pull request #103 from razvandimescu/feat/upstream-log-label
feat: distinguish UPSTREAM vs FORWARD in logs and stats
2026-04-15 14:00:28 +03:00
Razvan Dimescu
4bd08e206d feat(dashboard): hide zero-count path and transport rows 2026-04-14 21:25:11 +03:00
Razvan Dimescu
ebb2a5db39 refactor: simplify upstream-path test — reuse pool mutex, drop narrating comment 2026-04-14 18:26:45 +03:00
Razvan Dimescu
e0e0f50838 feat: distinguish UPSTREAM vs FORWARD in logs and stats
Queries matching a [[forwarding]] suffix rule now log as FORWARD;
queries resolved via the default [upstream] pool log as UPSTREAM.
Previously both paths shared the FORWARD label, making it impossible
to tell from logs whether a rule matched.

Adds QueryPath::Upstream, a queries.upstream stats counter exposed
via /stats, plus a matching dashboard filter, bar, and path tag.

Closes part of #102.
2026-04-14 18:18:32 +03:00
17 changed files with 1434 additions and 727 deletions

View File

@@ -56,6 +56,8 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@v6
with:
fetch-depth: 0
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: build
@@ -69,3 +71,62 @@ jobs:
with:
name: numa-windows-x86_64
path: target/debug/numa.exe
integration-linux:
needs: [check]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: build
run: cargo build --release
- name: install / verify / re-install / uninstall
run: |
sudo ./target/release/numa install
sleep 2
curl -sf http://127.0.0.1:5380/health
dig @127.0.0.1 example.com +short +timeout=5 | grep -q '.'
sudo ./target/release/numa install
sleep 2
curl -sf http://127.0.0.1:5380/health
sudo ./target/release/numa uninstall
sleep 1
! curl -sf http://127.0.0.1:5380/health 2>/dev/null
- name: cleanup
if: always()
run: |
sudo ./target/release/numa uninstall 2>/dev/null || true
# systemd-resolved has a ~40s DNS reconfiguration stall after
# restart (systemd issue #22521) that breaks the runner agent's
# connection to GitHub. Bridge it by replacing the stub-resolv
# symlink with a direct upstream — DNS works instantly and the
# runner can phone home for post-job steps.
sudo rm -f /etc/resolv.conf
echo "nameserver 8.8.8.8" | sudo tee /etc/resolv.conf > /dev/null
getent hosts github.com >/dev/null
integration-macos:
needs: [check-macos]
runs-on: macos-latest
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: build
run: cargo build --release
- name: install / verify / re-install / uninstall
run: |
sudo ./target/release/numa install
sleep 2
curl -sf http://127.0.0.1:5380/health
dig @127.0.0.1 example.com +short +timeout=5 | grep -q '.'
sudo ./target/release/numa install
sleep 2
curl -sf http://127.0.0.1:5380/health
sudo ./target/release/numa uninstall
sleep 1
! curl -sf http://127.0.0.1:5380/health 2>/dev/null
- name: cleanup
if: always()
run: sudo ./target/release/numa uninstall 2>/dev/null || true

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@ site/blog/posts/
ios/
drafts/
site/blog/index.html
.DS_Store

16
Cargo.lock generated
View File

@@ -1359,6 +1359,7 @@ dependencies = [
"toml",
"tower",
"webpki-roots 1.0.6",
"windows-service",
"x509-parser",
]
@@ -1834,9 +1835,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.103.10"
version = "0.103.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06"
dependencies = [
"aws-lc-rs",
"ring",
@@ -2583,6 +2584,17 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-service"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d24d6bcc7f734a4091ecf8d7a64c5f7d7066f45585c1861eba06449909609c8a"
dependencies = [
"bitflags",
"widestring",
"windows-sys 0.52.0",
]
[[package]]
name = "windows-strings"
version = "0.5.1"

View File

@@ -33,6 +33,9 @@ rustls-pemfile = "2.2.0"
qrcode = { version = "0.14", default-features = false, features = ["svg"] }
webpki-roots = "1"
[target.'cfg(windows)'.dependencies]
windows-service = "0.7"
[dev-dependencies]
criterion = { version = "0.8", features = ["html_reports"] }
tower = { version = "0.5", features = ["util"] }

48
build.rs Normal file
View File

@@ -0,0 +1,48 @@
fn main() {
// --long forces "TAG-N-gSHA[-dirty]" format even on exact tag matches,
// making parsing unambiguous for pre-release tags like v0.14.0-rc1.
let git_version = std::process::Command::new("git")
.args(["describe", "--tags", "--always", "--dirty", "--long"])
.output()
.ok()
.filter(|o| o.status.success())
.and_then(|o| String::from_utf8(o.stdout).ok())
.and_then(|raw| parse_git_describe(raw.trim()));
if let Some(v) = git_version {
println!("cargo:rustc-env=NUMA_BUILD_VERSION={}", v);
}
println!("cargo:rerun-if-changed=.git/HEAD");
}
/// Parse `git describe --long` output into a SemVer-compatible string.
/// "v0.13.1-0-ga87f907" → "0.13.1"
/// "v0.13.1-9-ga87f907" → "0.13.1+a87f907"
/// "v0.14.0-rc1-0-ga87f907" → "0.14.0-rc1"
/// "v0.14.0-rc1-3-ga87f907-dirty" → "0.14.0-rc1+a87f907-dirty"
/// "a87f907" → "0.0.0+a87f907"
fn parse_git_describe(s: &str) -> Option<String> {
let s = s.strip_prefix('v').unwrap_or(s);
let dirty = s.ends_with("-dirty");
let s = s.strip_suffix("-dirty").unwrap_or(s);
// --long format: TAG-N-gSHA. Split from the right so tags with hyphens work.
let gpos = s.rfind("-g")?;
let sha = &s[gpos + 2..];
let rest = &s[..gpos];
let npos = rest.rfind('-')?;
let n: u32 = rest[npos + 1..].parse().ok()?;
let tag = &rest[..npos];
if tag.is_empty() {
return Some(format!("0.0.0+{}", sha));
}
Some(match (n, dirty) {
(0, false) => tag.to_string(),
(0, true) => format!("{}+{}-dirty", tag, sha),
(_, false) => format!("{}+{}", tag, sha),
(_, true) => format!("{}+{}-dirty", tag, sha),
})
}

View File

@@ -217,6 +217,7 @@ body {
min-width: 2px;
}
.path-bar-fill.forward { background: var(--amber); }
.path-bar-fill.upstream { background: var(--amber-dim); }
.path-bar-fill.recursive { background: var(--cyan); }
.path-bar-fill.cached { background: var(--teal); }
.path-bar-fill.local { background: var(--violet); }
@@ -285,6 +286,7 @@ body {
font-weight: 500;
}
.path-tag.FORWARD { background: rgba(192, 98, 58, 0.12); color: var(--amber-dim); }
.path-tag.UPSTREAM { background: rgba(160, 120, 72, 0.12); color: var(--amber-dim); }
.path-tag.RECURSIVE { background: rgba(74, 124, 138, 0.12); color: var(--cyan); }
.path-tag.CACHED { background: rgba(107, 124, 78, 0.12); color: var(--teal-dim); }
.path-tag.LOCAL { background: rgba(100, 116, 139, 0.12); color: var(--violet-dim); }
@@ -550,7 +552,11 @@ body {
@media (max-width: 700px) {
.stats-row { grid-template-columns: repeat(2, 1fr); }
.dashboard { padding: 1rem; }
.header { padding: 1rem; }
.header { padding: 0.8rem 1rem; }
.logo { font-size: 1.4rem; }
.tagline { display: none; }
#headerVersion { display: none; }
#phoneSetup { display: none; }
}
</style>
</head>
@@ -559,6 +565,7 @@ body {
<div class="header">
<div class="header-left">
<div class="logo">Numa</div>
<span id="headerVersion" style="font-family:var(--font-mono);font-size:0.68rem;color:var(--text-dim);"></span>
<div class="tagline">DNS that governs itself</div>
</div>
<div style="display:flex;align-items:center;gap:1.2rem;">
@@ -655,6 +662,7 @@ body {
<option value="RECURSIVE">recursive</option>
<option value="COALESCED">coalesced</option>
<option value="FORWARD">forward</option>
<option value="UPSTREAM">upstream</option>
<option value="CACHED">cached</option>
<option value="BLOCKED">blocked</option>
<option value="OVERRIDE">override</option>
@@ -936,10 +944,12 @@ function renderMemory(mem, stats) {
function renderBarChart(containerId, defs, data, total) {
total = total || 1;
document.getElementById(containerId).innerHTML = defs.map(d => {
const count = data[d.key] || 0;
const pct = ((count / total) * 100).toFixed(1);
return `
document.getElementById(containerId).innerHTML = defs
.filter(d => (data[d.key] || 0) > 0)
.map(d => {
const count = data[d.key] || 0;
const pct = ((count / total) * 100).toFixed(1);
return `
<div class="path-bar-row">
<span class="path-label">${d.label}</span>
<div class="path-bar-track">
@@ -947,7 +957,7 @@ function renderBarChart(containerId, defs, data, total) {
</div>
<span class="path-pct">${pct}%</span>
</div>`;
}).join('');
}).join('');
}
function encryptionPct(transport) {
@@ -957,6 +967,7 @@ function encryptionPct(transport) {
const PATH_DEFS = [
{ key: 'forwarded', label: 'Forward', cls: 'forward' },
{ key: 'upstream', label: 'Upstream', cls: 'upstream' },
{ key: 'recursive', label: 'Recursive', cls: 'recursive' },
{ key: 'cached', label: 'Cached', cls: 'cached' },
{ key: 'local', label: 'Local', cls: 'local' },
@@ -1130,16 +1141,23 @@ async function refresh() {
document.getElementById('totalQueries').textContent = formatNumber(q.total);
document.getElementById('uptime').textContent = formatUptime(stats.uptime_secs);
document.getElementById('uptimeSub').textContent = formatUptimeSub(stats.uptime_secs);
document.getElementById('headerVersion').textContent = stats.version ? 'v' + stats.version : '';
document.getElementById('footerUpstream').textContent = stats.upstream || '';
document.getElementById('footerConfig').textContent = stats.config_path || '';
document.getElementById('footerData').textContent = stats.data_dir || '';
const modeEl = document.getElementById('footerMode');
modeEl.textContent = stats.mode || '—';
modeEl.style.color = stats.mode === 'recursive' ? 'var(--emerald)' : 'var(--amber)';
document.getElementById('footerDnssec').textContent = stats.dnssec ? 'on' : 'off';
document.getElementById('footerDnssec').style.color = stats.dnssec ? 'var(--emerald)' : 'var(--text-dim)';
document.getElementById('footerSrtt').textContent = stats.srtt ? 'on' : 'off';
document.getElementById('footerSrtt').style.color = stats.srtt ? 'var(--emerald)' : 'var(--text-dim)';
if (!document.getElementById('footerLogs').textContent) {
const isWin = stats.data_dir && stats.data_dir.includes(':\\');
const isMac = stats.data_dir && stats.data_dir.includes('/usr/local/');
const logsEl = document.getElementById('footerLogs');
logsEl.textContent = isWin
? stats.data_dir + '\\numa.log'
: isMac ? '/usr/local/var/log/numa.log'
: 'journalctl -u numa -f';
}
// LAN status indicator
const lanEl = document.getElementById('lanToggle');
@@ -1209,7 +1227,7 @@ async function refresh() {
prevTime = now;
// Cache hit rate
const answered = q.cached + q.forwarded + q.recursive + q.coalesced + q.local + q.overridden;
const answered = q.cached + q.forwarded + q.upstream + q.recursive + q.coalesced + q.local + q.overridden;
const hitRate = answered > 0 ? ((q.cached / answered) * 100).toFixed(1) : '0.0';
document.getElementById('cacheRate').textContent = hitRate + '%';
@@ -1339,6 +1357,7 @@ function renderBlockingInfo(info) {
}
function renderAllowlist(entries) {
if (document.activeElement && document.activeElement.id === 'allowDomainInput') return;
const el = document.getElementById('blockingAllowlist');
const count = entries.length;
el.innerHTML = `
@@ -1498,14 +1517,14 @@ refresh();
setInterval(refresh, 2000);
</script>
<div style="text-align:center;padding:0.8rem;font-family:var(--font-mono);font-size:0.68rem;color:var(--text-dim);">
<div style="text-align:center;padding:0.8rem 0.8rem 0.4rem;font-family:var(--font-mono);font-size:0.68rem;color:var(--text-dim);line-height:1.8;">
Config: <span id="footerConfig" style="user-select:all;color:var(--emerald);"></span>
· Data: <span id="footerData" style="user-select:all;color:var(--emerald);"></span>
· Upstream: <span id="footerUpstream" style="user-select:all;color:var(--emerald);"></span>
· Mode: <span id="footerMode" style="color:var(--text-dim);"></span>
· Logs: <span id="footerLogs" style="user-select:all;color:var(--emerald);"></span>
<br>
Upstream: <span id="footerUpstream" style="user-select:all;color:var(--emerald);"></span>
· DNSSEC: <span id="footerDnssec" style="color:var(--text-dim);"></span>
· SRTT: <span id="footerSrtt" style="color:var(--text-dim);"></span>
· Logs: <span style="user-select:all;color:var(--emerald);">macOS: /usr/local/var/log/numa.log · Linux: journalctl -u numa -f</span>
· <a href="https://github.com/razvandimescu/numa" target="_blank" rel="noopener" style="color:var(--amber);text-decoration:none;">GitHub</a>
</div>

View File

@@ -160,6 +160,7 @@ struct QueryLogResponse {
#[derive(Serialize)]
struct StatsResponse {
version: &'static str,
uptime_secs: u64,
upstream: String,
mode: &'static str, // "recursive" or "forward" — never "auto" at runtime
@@ -201,6 +202,7 @@ struct LanStatsResponse {
struct QueriesStats {
total: u64,
forwarded: u64,
upstream: u64,
recursive: u64,
coalesced: u64,
cached: u64,
@@ -538,6 +540,7 @@ async fn stats(State(ctx): State<Arc<ServerCtx>>) -> Json<StatsResponse> {
};
Json(StatsResponse {
version: crate::version(),
uptime_secs: snap.uptime_secs,
upstream,
mode: ctx.upstream_mode.as_str(),
@@ -548,6 +551,7 @@ async fn stats(State(ctx): State<Arc<ServerCtx>>) -> Json<StatsResponse> {
queries: QueriesStats {
total: snap.total,
forwarded: snap.forwarded,
upstream: snap.upstream,
recursive: snap.recursive,
coalesced: snap.coalesced,
cached: snap.cached,

View File

@@ -246,7 +246,7 @@ pub async fn resolve_query(
.await
{
Ok(resp_wire) => match cache_and_parse(ctx, &qname, qtype, &resp_wire) {
Ok(resp) => (resp, QueryPath::Forwarded, DnssecStatus::Indeterminate),
Ok(resp) => (resp, QueryPath::Upstream, DnssecStatus::Indeterminate),
Err(e) => {
error!("{} | {:?} {} | PARSE ERROR | {}", src_addr, qtype, qname, e);
(
@@ -1253,4 +1253,29 @@ mod tests {
other => panic!("expected A record, got {:?}", other),
}
}
#[tokio::test]
async fn pipeline_default_pool_reports_upstream_path() {
let mut upstream_resp = DnsPacket::new();
upstream_resp.header.response = true;
upstream_resp.header.rescode = ResultCode::NOERROR;
upstream_resp.answers.push(DnsRecord::A {
domain: "example.com".to_string(),
addr: Ipv4Addr::new(93, 184, 216, 34),
ttl: 300,
});
let upstream_addr = crate::testutil::mock_upstream(upstream_resp).await;
let ctx = crate::testutil::test_ctx().await;
ctx.upstream_pool
.lock()
.unwrap()
.set_primary(vec![Upstream::Udp(upstream_addr)]);
let ctx = Arc::new(ctx);
let (resp, path) = resolve_in_test(&ctx, "example.com", QueryType::A).await;
assert_eq!(path, QueryPath::Upstream);
assert_eq!(resp.header.rescode, ResultCode::NOERROR);
assert_eq!(resp.answers.len(), 1);
}
}

View File

@@ -43,7 +43,7 @@ impl HealthMeta {
#[cfg(test)]
pub fn test_fixture() -> Self {
HealthMeta {
version: env!("CARGO_PKG_VERSION"),
version: crate::version(),
hostname: "test-host".to_string(),
sni: "numa.numa".to_string(),
dot_enabled: false,
@@ -99,7 +99,7 @@ impl HealthMeta {
}
HealthMeta {
version: env!("CARGO_PKG_VERSION"),
version: crate::version(),
hostname: crate::hostname(),
sni: "numa.numa".to_string(),
dot_enabled,

View File

@@ -20,6 +20,7 @@ pub mod query_log;
pub mod question;
pub mod record;
pub mod recursive;
pub mod serve;
pub mod service_store;
pub mod setup_phone;
pub mod srtt;
@@ -28,12 +29,23 @@ pub mod system_dns;
pub mod tls;
pub mod wire;
#[cfg(windows)]
pub mod windows_service;
#[cfg(test)]
pub(crate) mod testutil;
pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T, Error>;
/// Build version string. On tagged releases: `0.13.1`. On commits ahead
/// of a tag: `0.13.1+a87f907`. With uncommitted changes: `0.13.1+a87f907-dirty`.
/// Falls back to `CARGO_PKG_VERSION` when built outside a git repo (e.g.
/// from a source tarball).
pub fn version() -> &'static str {
option_env!("NUMA_BUILD_VERSION").unwrap_or(env!("CARGO_PKG_VERSION"))
}
/// Detect the machine hostname via the `hostname` command. Returns the
/// full hostname (e.g., `macbook-pro.local`), or `"numa"` if the command
/// fails. Call sites that need the short form (e.g., mDNS instance
@@ -89,14 +101,11 @@ where
/// Linux root daemon: /var/lib/numa (FHS) — falls back to /usr/local/var/numa
/// if a pre-v0.10.1 install already lives there.
/// macOS root daemon: /usr/local/var/numa (Homebrew prefix)
/// Windows: %APPDATA%\numa
/// Windows: %PROGRAMDATA%\numa (same as data_dir — no per-user config on Windows)
pub fn config_dir() -> std::path::PathBuf {
#[cfg(windows)]
{
std::path::PathBuf::from(
std::env::var("APPDATA").unwrap_or_else(|_| "C:\\ProgramData".into()),
)
.join("numa")
data_dir()
}
#[cfg(not(windows))]
{

View File

@@ -1,51 +1,49 @@
use std::net::SocketAddr;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use arc_swap::ArcSwap;
use log::{error, info};
use tokio::net::UdpSocket;
use numa::blocklist::{download_blocklists, parse_blocklist, BlocklistStore};
use numa::buffer::BytePacketBuffer;
use numa::cache::DnsCache;
use numa::config::{build_zone_map, load_config, ConfigLoad};
use numa::ctx::{handle_query, ServerCtx};
use numa::forward::{parse_upstream, Upstream, UpstreamPool};
use numa::override_store::OverrideStore;
use numa::query_log::QueryLog;
use numa::service_store::ServiceStore;
use numa::stats::{ServerStats, Transport};
use numa::system_dns::{
discover_system_dns, install_service, restart_service, service_status, uninstall_service,
install_service, restart_service, service_status, start_service, stop_service,
uninstall_service,
};
const QUAD9_IP: &str = "9.9.9.9";
const DOH_FALLBACK: &str = "https://9.9.9.9/dns-query";
fn main() -> numa::Result<()> {
// Handle CLI subcommands
let arg1 = std::env::args().nth(1).unwrap_or_default();
#[cfg(windows)]
if arg1 == "--service" {
// Running under SCM — stderr goes nowhere. Redirect logs to a file.
let log_path = numa::data_dir().join("numa.log");
let log_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.expect("failed to open log file");
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.format_timestamp_millis()
.target(env_logger::Target::Pipe(Box::new(log_file)))
.init();
numa::windows_service::run_as_service()
.map_err(|e| format!("windows service dispatcher failed: {}", e))?;
return Ok(());
}
#[tokio::main]
async fn main() -> numa::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.format_timestamp_millis()
.init();
// Handle CLI subcommands
let arg1 = std::env::args().nth(1).unwrap_or_default();
match arg1.as_str() {
"install" => {
eprintln!("\x1b[1;38;2;192;98;58mNuma\x1b[0m — installing\n");
eprintln!("\x1b[1;38;5;166mNuma\x1b[0m — installing\n");
return install_service().map_err(|e| e.into());
}
"uninstall" => {
eprintln!("\x1b[1;38;2;192;98;58mNuma\x1b[0m — uninstalling\n");
eprintln!("\x1b[1;38;5;166mNuma\x1b[0m — uninstalling\n");
return uninstall_service().map_err(|e| e.into());
}
"service" => {
let sub = std::env::args().nth(2).unwrap_or_default();
eprintln!("\x1b[1;38;2;192;98;58mNuma\x1b[0m — service management\n");
eprintln!("\x1b[1;38;5;166mNuma\x1b[0m — service management\n");
return match sub.as_str() {
"start" => install_service().map_err(|e| e.into()),
"stop" => uninstall_service().map_err(|e| e.into()),
"start" => start_service().map_err(|e| e.into()),
"stop" => stop_service().map_err(|e| e.into()),
"restart" => restart_service().map_err(|e| e.into()),
"status" => service_status().map_err(|e| e.into()),
_ => {
@@ -55,7 +53,12 @@ async fn main() -> numa::Result<()> {
};
}
"setup-phone" => {
return numa::setup_phone::run().await.map_err(|e| e.into());
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
return runtime
.block_on(numa::setup_phone::run())
.map_err(|e| e.into());
}
"lan" => {
let sub = std::env::args().nth(2).unwrap_or_default();
@@ -102,7 +105,7 @@ async fn main() -> numa::Result<()> {
&& !arg1.ends_with(".toml")
{
eprintln!(
"\x1b[1;38;2;192;98;58mNuma\x1b[0m — unknown command: \x1b[1m{}\x1b[0m\n",
"\x1b[1;38;5;166mNuma\x1b[0m — unknown command: \x1b[1m{}\x1b[0m\n",
arg1
);
eprintln!("Run \x1b[1mnuma help\x1b[0m for a list of commands.");
@@ -118,552 +121,11 @@ async fn main() -> numa::Result<()> {
} else {
arg1 // treat as config path for backwards compatibility
};
let ConfigLoad {
config,
path: resolved_config_path,
found: config_found,
} = load_config(&config_path)?;
// Discover system DNS in a single pass (upstream + forwarding rules)
let system_dns = discover_system_dns();
let root_hints = numa::recursive::parse_root_hints(&config.upstream.root_hints);
let recursive_pool = || {
let dummy = UpstreamPool::new(vec![Upstream::Udp("0.0.0.0:0".parse().unwrap())], vec![]);
(dummy, "recursive (root hints)".to_string())
};
let (resolved_mode, upstream_auto, pool, upstream_label) = match config.upstream.mode {
numa::config::UpstreamMode::Auto => {
info!("auto mode: probing recursive resolution...");
if numa::recursive::probe_recursive(&root_hints).await {
info!("recursive probe succeeded — self-sovereign mode");
let (pool, label) = recursive_pool();
(numa::config::UpstreamMode::Recursive, false, pool, label)
} else {
log::warn!("recursive probe failed — falling back to Quad9 DoH");
let client = reqwest::Client::builder()
.use_rustls_tls()
.build()
.unwrap_or_default();
let url = DOH_FALLBACK.to_string();
let label = url.clone();
let pool = UpstreamPool::new(vec![Upstream::Doh { url, client }], vec![]);
(numa::config::UpstreamMode::Forward, false, pool, label)
}
}
numa::config::UpstreamMode::Recursive => {
let (pool, label) = recursive_pool();
(numa::config::UpstreamMode::Recursive, false, pool, label)
}
numa::config::UpstreamMode::Forward => {
let addrs = if config.upstream.address.is_empty() {
let detected = system_dns
.default_upstream
.or_else(numa::system_dns::detect_dhcp_dns)
.unwrap_or_else(|| {
info!("could not detect system DNS, falling back to Quad9 DoH");
DOH_FALLBACK.to_string()
});
vec![detected]
} else {
config.upstream.address.clone()
};
let primary: Vec<Upstream> = addrs
.iter()
.map(|s| parse_upstream(s, config.upstream.port))
.collect::<numa::Result<Vec<_>>>()?;
let fallback: Vec<Upstream> = config
.upstream
.fallback
.iter()
.map(|s| parse_upstream(s, config.upstream.port))
.collect::<numa::Result<Vec<_>>>()?;
let pool = UpstreamPool::new(primary, fallback);
let label = pool.label();
(
numa::config::UpstreamMode::Forward,
config.upstream.address.is_empty(),
pool,
label,
)
}
};
let api_port = config.server.api_port;
let mut blocklist = BlocklistStore::new();
for domain in &config.blocking.allowlist {
blocklist.add_to_allowlist(domain);
}
if !config.blocking.enabled {
blocklist.set_enabled(false);
}
// Build service store: config services + persisted user services
let mut service_store = ServiceStore::new();
service_store.insert_from_config("numa", config.server.api_port, Vec::new());
for svc in &config.services {
service_store.insert_from_config(&svc.name, svc.target_port, svc.routes.clone());
}
service_store.load_persisted();
for fwd in &config.forwarding {
for suffix in &fwd.suffix {
info!("forwarding .{} to {} (config rule)", suffix, fwd.upstream);
}
}
let forwarding_rules =
numa::config::merge_forwarding_rules(&config.forwarding, system_dns.forwarding_rules)?;
// Resolve data_dir from config, falling back to the platform default.
// Used for TLS CA storage below and stored on ServerCtx for runtime use.
let resolved_data_dir = config
.server
.data_dir
.clone()
.unwrap_or_else(numa::data_dir);
// Build initial TLS config before ServerCtx (so ArcSwap is ready at construction)
let initial_tls = if config.proxy.enabled && config.proxy.tls_port > 0 {
let service_names = service_store.names();
match numa::tls::build_tls_config(
&config.proxy.tld,
&service_names,
Vec::new(),
&resolved_data_dir,
) {
Ok(tls_config) => Some(ArcSwap::from(tls_config)),
Err(e) => {
if let Some(advisory) = numa::tls::try_data_dir_advisory(&e, &resolved_data_dir) {
eprint!("{}", advisory);
} else {
log::warn!("TLS setup failed, HTTPS proxy disabled: {}", e);
}
None
}
}
} else {
None
};
let doh_enabled = initial_tls.is_some();
let health_meta = numa::health::HealthMeta::build(
&resolved_data_dir,
config.dot.enabled,
config.dot.port,
config.mobile.port,
config.dnssec.enabled,
resolved_mode == numa::config::UpstreamMode::Recursive,
config.lan.enabled,
config.blocking.enabled,
doh_enabled,
);
let ca_pem = std::fs::read_to_string(resolved_data_dir.join("ca.pem")).ok();
let socket = match UdpSocket::bind(&config.server.bind_addr).await {
Ok(s) => s,
Err(e) => {
if let Some(advisory) =
numa::system_dns::try_port53_advisory(&config.server.bind_addr, &e)
{
eprint!("{}", advisory);
std::process::exit(1);
}
return Err(e.into());
}
};
let ctx = Arc::new(ServerCtx {
socket,
zone_map: build_zone_map(&config.zones)?,
cache: RwLock::new(DnsCache::new(
config.cache.max_entries,
config.cache.min_ttl,
config.cache.max_ttl,
)),
refreshing: Mutex::new(std::collections::HashSet::new()),
stats: Mutex::new(ServerStats::new()),
overrides: RwLock::new(OverrideStore::new()),
blocklist: RwLock::new(blocklist),
query_log: Mutex::new(QueryLog::new(1000)),
services: Mutex::new(service_store),
lan_peers: Mutex::new(numa::lan::PeerStore::new(config.lan.peer_timeout_secs)),
forwarding_rules,
upstream_pool: Mutex::new(pool),
upstream_auto,
upstream_port: config.upstream.port,
lan_ip: Mutex::new(numa::lan::detect_lan_ip().unwrap_or(std::net::Ipv4Addr::LOCALHOST)),
timeout: Duration::from_millis(config.upstream.timeout_ms),
hedge_delay: Duration::from_millis(config.upstream.hedge_ms),
proxy_tld_suffix: if config.proxy.tld.is_empty() {
String::new()
} else {
format!(".{}", config.proxy.tld)
},
proxy_tld: config.proxy.tld.clone(),
lan_enabled: config.lan.enabled,
config_path: resolved_config_path,
config_found,
config_dir: numa::config_dir(),
data_dir: resolved_data_dir,
tls_config: initial_tls,
upstream_mode: resolved_mode,
root_hints,
srtt: std::sync::RwLock::new(numa::srtt::SrttCache::new(config.upstream.srtt)),
inflight: std::sync::Mutex::new(std::collections::HashMap::new()),
dnssec_enabled: config.dnssec.enabled,
dnssec_strict: config.dnssec.strict,
health_meta,
ca_pem,
mobile_enabled: config.mobile.enabled,
mobile_port: config.mobile.port,
});
let zone_count: usize = ctx.zone_map.values().map(|m| m.len()).sum();
// Build banner rows, then size the box to fit the longest value
let api_url = format!("http://localhost:{}", api_port);
let proxy_label = if config.proxy.enabled {
if config.proxy.tls_port > 0 {
Some(format!(
"http://:{} https://:{}",
config.proxy.port, config.proxy.tls_port
))
} else {
Some(format!(
"http://*.{} on :{}",
config.proxy.tld, config.proxy.port
))
}
} else {
None
};
let config_label = if ctx.config_found {
ctx.config_path.clone()
} else {
format!("{} (defaults)", ctx.config_path)
};
let data_label = ctx.data_dir.display().to_string();
let services_label = ctx.config_dir.join("services.json").display().to_string();
// label (10) + value + padding (2) = inner width; minimum 40 for the title row
let val_w = [
config.server.bind_addr.len(),
api_url.len(),
upstream_label.len(),
config_label.len(),
data_label.len(),
services_label.len(),
]
.into_iter()
.chain(proxy_label.as_ref().map(|s| s.len()))
.max()
.unwrap_or(30);
let w = (val_w + 12).max(42); // 10 label + 2 padding, min 42 for title
let o = "\x1b[38;2;192;98;58m"; // orange
let g = "\x1b[38;2;107;124;78m"; // green
let d = "\x1b[38;2;163;152;136m"; // dim
let r = "\x1b[0m"; // reset
let b = "\x1b[1;38;2;192;98;58m"; // bold orange
let it = "\x1b[3;38;2;163;152;136m"; // italic dim
let bar_top = "".repeat(w);
let bar_mid = "".repeat(w);
let row = |label: &str, color: &str, value: &str| {
eprintln!(
"{o}{r} {color}{:<9}{r} {:<vw$}{o}{r}",
label,
value,
vw = w - 12
);
};
// Title row: center within the box
let title = format!(
"{b}NUMA{r} {it}DNS that governs itself{r} {d}v{}{r}",
env!("CARGO_PKG_VERSION")
);
// The title contains ANSI codes; visible length is ~38 chars. Pad to fill the box.
let title_visible_len = 4 + 2 + 24 + 2 + 1 + env!("CARGO_PKG_VERSION").len() + 1;
let title_pad = w.saturating_sub(title_visible_len);
eprintln!("\n{o}{bar_top}{r}");
eprint!("{o}{r} {title}");
eprintln!("{}{o}{r}", " ".repeat(title_pad));
eprintln!("{o}{bar_top}{r}");
row("DNS", g, &config.server.bind_addr);
row("API", g, &api_url);
row("Dashboard", g, &api_url);
row(
"Upstream",
g,
if ctx.upstream_mode == numa::config::UpstreamMode::Recursive {
"recursive (root hints)"
} else {
&upstream_label
},
);
row("Zones", g, &format!("{} records", zone_count));
row(
"Cache",
g,
&format!("max {} entries", config.cache.max_entries),
);
if !config.cache.warm.is_empty() {
row("Warm", g, &format!("{} domains", config.cache.warm.len()));
}
row(
"Blocking",
g,
&if config.blocking.enabled {
format!("{} lists", config.blocking.lists.len())
} else {
"disabled".to_string()
},
);
if let Some(ref label) = proxy_label {
row("Proxy", g, label);
if config.proxy.bind_addr == "127.0.0.1" {
let y = "\x1b[38;2;204;176;59m"; // yellow
row(
"",
y,
&format!(
"⚠ proxy on 127.0.0.1 — .{} not LAN reachable",
config.proxy.tld
),
);
}
}
if config.dot.enabled {
row("DoT", g, &format!("tls://:{}", config.dot.port));
}
if doh_enabled {
row(
"DoH",
g,
&format!("https://:{}/dns-query", config.proxy.tls_port),
);
}
if config.lan.enabled {
row("LAN", g, "mDNS (_numa._tcp.local)");
}
if !ctx.forwarding_rules.is_empty() {
row(
"Routing",
g,
&format!("{} conditional rules", ctx.forwarding_rules.len()),
);
}
eprintln!("{o}{bar_mid}{r}");
row("Config", d, &config_label);
row("Data", d, &data_label);
row("Services", d, &services_label);
eprintln!("{o}{bar_top}{r}\n");
info!(
"numa listening on {}, upstream {}, {} zone records, cache max {}, API on port {}",
config.server.bind_addr, upstream_label, zone_count, config.cache.max_entries, api_port,
);
// Download blocklists on startup
let blocklist_lists = config.blocking.lists.clone();
let refresh_hours = config.blocking.refresh_hours;
if config.blocking.enabled && !blocklist_lists.is_empty() {
let bl_ctx = Arc::clone(&ctx);
let bl_lists = blocklist_lists.clone();
tokio::spawn(async move {
load_blocklists(&bl_ctx, &bl_lists).await;
// Periodic refresh
let mut interval = tokio::time::interval(Duration::from_secs(refresh_hours * 3600));
interval.tick().await; // skip immediate tick
loop {
interval.tick().await;
info!("refreshing blocklists...");
load_blocklists(&bl_ctx, &bl_lists).await;
}
});
}
// Prime TLD cache (recursive mode only)
if ctx.upstream_mode == numa::config::UpstreamMode::Recursive {
let prime_ctx = Arc::clone(&ctx);
let prime_tlds = config.upstream.prime_tlds;
tokio::spawn(async move {
numa::recursive::prime_tld_cache(
&prime_ctx.cache,
&prime_ctx.root_hints,
&prime_tlds,
&prime_ctx.srtt,
)
.await;
});
}
// Spawn cache warming for user-configured domains
if !config.cache.warm.is_empty() {
let warm_ctx = Arc::clone(&ctx);
let warm_domains = config.cache.warm.clone();
tokio::spawn(async move {
cache_warm_loop(warm_ctx, warm_domains).await;
});
}
// Spawn DoH connection keepalive — prevents idle TLS teardown
{
let keepalive_ctx = Arc::clone(&ctx);
tokio::spawn(async move {
doh_keepalive_loop(keepalive_ctx).await;
});
}
// Spawn HTTP API server
let api_ctx = Arc::clone(&ctx);
let api_addr: SocketAddr = format!("{}:{}", config.server.api_bind_addr, api_port).parse()?;
tokio::spawn(async move {
let app = numa::api::router(api_ctx);
let listener = tokio::net::TcpListener::bind(api_addr).await.unwrap();
info!("HTTP API listening on {}", api_addr);
axum::serve(listener, app).await.unwrap();
});
// Spawn Mobile API listener (read-only subset for iOS/Android companion
// apps, LAN-bound by default so phones can reach it). Only idempotent
// GETs; no state-mutating routes are exposed here regardless of
// the main API's bind address.
if config.mobile.enabled {
let mobile_ctx = Arc::clone(&ctx);
let mobile_bind = config.mobile.bind_addr.clone();
let mobile_port = config.mobile.port;
tokio::spawn(async move {
if let Err(e) = numa::mobile_api::start(mobile_ctx, mobile_bind, mobile_port).await {
log::warn!("Mobile API listener failed: {}", e);
}
});
}
let proxy_bind: std::net::Ipv4Addr = config
.proxy
.bind_addr
.parse()
.unwrap_or(std::net::Ipv4Addr::LOCALHOST);
// Spawn HTTP reverse proxy for .numa domains
if config.proxy.enabled {
let proxy_ctx = Arc::clone(&ctx);
let proxy_port = config.proxy.port;
tokio::spawn(async move {
numa::proxy::start_proxy(proxy_ctx, proxy_port, proxy_bind).await;
});
}
// Spawn HTTPS reverse proxy with TLS termination
if config.proxy.enabled && config.proxy.tls_port > 0 && ctx.tls_config.is_some() {
let proxy_ctx = Arc::clone(&ctx);
let tls_port = config.proxy.tls_port;
tokio::spawn(async move {
numa::proxy::start_proxy_tls(proxy_ctx, tls_port, proxy_bind).await;
});
}
// Spawn network change watcher (upstream re-detection, LAN IP update, peer flush)
{
let watch_ctx = Arc::clone(&ctx);
tokio::spawn(async move {
network_watch_loop(watch_ctx).await;
});
}
// Spawn LAN service discovery
if config.lan.enabled {
let lan_ctx = Arc::clone(&ctx);
let lan_config = config.lan.clone();
tokio::spawn(async move {
numa::lan::start_lan_discovery(lan_ctx, &lan_config).await;
});
}
// Spawn DNS-over-TLS listener (RFC 7858)
if config.dot.enabled {
let dot_ctx = Arc::clone(&ctx);
let dot_config = config.dot.clone();
tokio::spawn(async move {
numa::dot::start_dot(dot_ctx, &dot_config).await;
});
}
// UDP DNS listener
#[allow(clippy::infinite_loop)]
loop {
let mut buffer = BytePacketBuffer::new();
let (len, src_addr) = match ctx.socket.recv_from(&mut buffer.buf).await {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {
// Windows delivers ICMP port-unreachable as ConnectionReset on UDP sockets
continue;
}
Err(e) => return Err(e.into()),
};
let ctx = Arc::clone(&ctx);
tokio::spawn(async move {
if let Err(e) = handle_query(buffer, len, src_addr, &ctx, Transport::Udp).await {
error!("{} | HANDLER ERROR | {}", src_addr, e);
}
});
}
}
async fn network_watch_loop(ctx: Arc<numa::ctx::ServerCtx>) {
let mut tick: u64 = 0;
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.tick().await; // skip immediate tick
loop {
interval.tick().await;
tick += 1;
let mut changed = false;
// Check LAN IP change (every 5s — cheap, one UDP socket call)
if let Some(new_ip) = numa::lan::detect_lan_ip() {
let mut current_ip = ctx.lan_ip.lock().unwrap();
if new_ip != *current_ip {
info!("LAN IP changed: {} → {}", current_ip, new_ip);
*current_ip = new_ip;
changed = true;
numa::recursive::reset_udp_state();
}
}
// Re-detect upstream every 30s or on LAN IP change (auto-detect only)
if ctx.upstream_auto && (changed || tick.is_multiple_of(6)) {
let dns_info = numa::system_dns::discover_system_dns();
let new_addr = dns_info
.default_upstream
.or_else(numa::system_dns::detect_dhcp_dns)
.unwrap_or_else(|| QUAD9_IP.to_string());
let mut pool = ctx.upstream_pool.lock().unwrap();
if pool.maybe_update_primary(&new_addr, ctx.upstream_port) {
info!("upstream changed → {}", pool.label());
changed = true;
}
}
// Flush stale LAN peers on any network change
if changed {
ctx.lan_peers.lock().unwrap().clear();
info!("flushed LAN peers after network change");
}
// Re-probe UDP every 5 minutes when disabled
if tick.is_multiple_of(60) {
numa::recursive::probe_udp(&ctx.root_hints).await;
}
}
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
runtime.block_on(numa::serve::run(config_path))
}
fn set_lan_enabled(enabled: bool, path: &str) -> numa::Result<()> {
@@ -723,78 +185,10 @@ fn print_lan_status(enabled: bool) {
let label = if enabled { "enabled" } else { "disabled" };
let color = if enabled { "32" } else { "33" };
eprintln!(
"\x1b[1;38;2;192;98;58mNuma\x1b[0m — LAN discovery \x1b[{}m{}\x1b[0m",
"\x1b[1;38;5;166mNuma\x1b[0m — LAN discovery \x1b[{}m{}\x1b[0m",
color, label
);
if enabled {
eprintln!(" Restart Numa to start mDNS discovery");
}
}
async fn load_blocklists(ctx: &ServerCtx, lists: &[String]) {
let downloaded = download_blocklists(lists).await;
// Parse outside the lock to avoid blocking DNS queries during parse (~100ms)
let mut all_domains = std::collections::HashSet::new();
let mut sources = Vec::new();
for (source, text) in &downloaded {
let domains = parse_blocklist(text);
info!("blocklist: {} domains from {}", domains.len(), source);
all_domains.extend(domains);
sources.push(source.clone());
}
let total = all_domains.len();
// Swap under lock — sub-microsecond
ctx.blocklist
.write()
.unwrap()
.swap_domains(all_domains, sources);
info!(
"blocking enabled: {} unique domains from {} lists",
total,
downloaded.len()
);
}
async fn warm_domain(ctx: &ServerCtx, domain: &str) {
for qtype in [
numa::question::QueryType::A,
numa::question::QueryType::AAAA,
] {
numa::ctx::refresh_entry(ctx, domain, qtype).await;
}
}
async fn doh_keepalive_loop(ctx: Arc<ServerCtx>) {
let mut interval = tokio::time::interval(Duration::from_secs(25));
interval.tick().await; // skip first immediate tick
loop {
interval.tick().await;
let pool = ctx.upstream_pool.lock().unwrap().clone();
if let Some(upstream) = pool.preferred() {
numa::forward::keepalive_doh(upstream).await;
}
}
}
async fn cache_warm_loop(ctx: Arc<ServerCtx>, domains: Vec<String>) {
tokio::time::sleep(Duration::from_secs(2)).await;
for domain in &domains {
warm_domain(&ctx, domain).await;
}
info!("cache warm: {} domains resolved at startup", domains.len());
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
for domain in &domains {
let refresh = ctx.cache.read().unwrap().needs_warm(domain);
if refresh {
warm_domain(&ctx, domain).await;
}
}
}
}

646
src/serve.rs Normal file
View File

@@ -0,0 +1,646 @@
//! The main DNS-server runtime.
//!
//! Extracted from `main.rs` so both the interactive CLI entry and the
//! Windows service dispatcher (`windows_service` module) can drive the
//! same startup/serve loop.
use std::net::SocketAddr;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use arc_swap::ArcSwap;
use log::{error, info};
use tokio::net::UdpSocket;
use crate::blocklist::{download_blocklists, parse_blocklist, BlocklistStore};
use crate::buffer::BytePacketBuffer;
use crate::cache::DnsCache;
use crate::config::{build_zone_map, load_config, ConfigLoad};
use crate::ctx::{handle_query, ServerCtx};
use crate::forward::{parse_upstream, Upstream, UpstreamPool};
use crate::override_store::OverrideStore;
use crate::query_log::QueryLog;
use crate::service_store::ServiceStore;
use crate::stats::{ServerStats, Transport};
use crate::system_dns::discover_system_dns;
const QUAD9_IP: &str = "9.9.9.9";
const DOH_FALLBACK: &str = "https://9.9.9.9/dns-query";
/// Boot the DNS server and run until the UDP listener errors out.
pub async fn run(config_path: String) -> crate::Result<()> {
let ConfigLoad {
config,
path: resolved_config_path,
found: config_found,
} = load_config(&config_path)?;
// Discover system DNS in a single pass (upstream + forwarding rules)
let system_dns = discover_system_dns();
let root_hints = crate::recursive::parse_root_hints(&config.upstream.root_hints);
let recursive_pool = || {
let dummy = UpstreamPool::new(vec![Upstream::Udp("0.0.0.0:0".parse().unwrap())], vec![]);
(dummy, "recursive (root hints)".to_string())
};
let (resolved_mode, upstream_auto, pool, upstream_label) = match config.upstream.mode {
crate::config::UpstreamMode::Auto => {
info!("auto mode: probing recursive resolution...");
if crate::recursive::probe_recursive(&root_hints).await {
info!("recursive probe succeeded — self-sovereign mode");
let (pool, label) = recursive_pool();
(crate::config::UpstreamMode::Recursive, false, pool, label)
} else {
log::warn!("recursive probe failed — falling back to Quad9 DoH");
let client = reqwest::Client::builder()
.use_rustls_tls()
.build()
.unwrap_or_default();
let url = DOH_FALLBACK.to_string();
let label = url.clone();
let pool = UpstreamPool::new(vec![Upstream::Doh { url, client }], vec![]);
(crate::config::UpstreamMode::Forward, false, pool, label)
}
}
crate::config::UpstreamMode::Recursive => {
let (pool, label) = recursive_pool();
(crate::config::UpstreamMode::Recursive, false, pool, label)
}
crate::config::UpstreamMode::Forward => {
let addrs = if config.upstream.address.is_empty() {
let detected = system_dns
.default_upstream
.or_else(crate::system_dns::detect_dhcp_dns)
.unwrap_or_else(|| {
info!("could not detect system DNS, falling back to Quad9 DoH");
DOH_FALLBACK.to_string()
});
vec![detected]
} else {
config.upstream.address.clone()
};
let primary: Vec<Upstream> = addrs
.iter()
.map(|s| parse_upstream(s, config.upstream.port))
.collect::<crate::Result<Vec<_>>>()?;
let fallback: Vec<Upstream> = config
.upstream
.fallback
.iter()
.map(|s| parse_upstream(s, config.upstream.port))
.collect::<crate::Result<Vec<_>>>()?;
let pool = UpstreamPool::new(primary, fallback);
let label = pool.label();
(
crate::config::UpstreamMode::Forward,
config.upstream.address.is_empty(),
pool,
label,
)
}
};
let api_port = config.server.api_port;
let mut blocklist = BlocklistStore::new();
for domain in &config.blocking.allowlist {
blocklist.add_to_allowlist(domain);
}
if !config.blocking.enabled {
blocklist.set_enabled(false);
}
// Build service store: config services + persisted user services
let mut service_store = ServiceStore::new();
service_store.insert_from_config("numa", config.server.api_port, Vec::new());
for svc in &config.services {
service_store.insert_from_config(&svc.name, svc.target_port, svc.routes.clone());
}
service_store.load_persisted();
for fwd in &config.forwarding {
for suffix in &fwd.suffix {
info!("forwarding .{} to {} (config rule)", suffix, fwd.upstream);
}
}
let forwarding_rules =
crate::config::merge_forwarding_rules(&config.forwarding, system_dns.forwarding_rules)?;
// Resolve data_dir from config, falling back to the platform default.
// Used for TLS CA storage below and stored on ServerCtx for runtime use.
let resolved_data_dir = config
.server
.data_dir
.clone()
.unwrap_or_else(crate::data_dir);
// Build initial TLS config before ServerCtx (so ArcSwap is ready at construction)
let initial_tls = if config.proxy.enabled && config.proxy.tls_port > 0 {
let service_names = service_store.names();
match crate::tls::build_tls_config(
&config.proxy.tld,
&service_names,
Vec::new(),
&resolved_data_dir,
) {
Ok(tls_config) => Some(ArcSwap::from(tls_config)),
Err(e) => {
if let Some(advisory) = crate::tls::try_data_dir_advisory(&e, &resolved_data_dir) {
eprint!("{}", advisory);
} else {
log::warn!("TLS setup failed, HTTPS proxy disabled: {}", e);
}
None
}
}
} else {
None
};
let doh_enabled = initial_tls.is_some();
let health_meta = crate::health::HealthMeta::build(
&resolved_data_dir,
config.dot.enabled,
config.dot.port,
config.mobile.port,
config.dnssec.enabled,
resolved_mode == crate::config::UpstreamMode::Recursive,
config.lan.enabled,
config.blocking.enabled,
doh_enabled,
);
let ca_pem = std::fs::read_to_string(resolved_data_dir.join("ca.pem")).ok();
let socket = match UdpSocket::bind(&config.server.bind_addr).await {
Ok(s) => s,
Err(e) => {
if let Some(advisory) =
crate::system_dns::try_port53_advisory(&config.server.bind_addr, &e)
{
eprint!("{}", advisory);
std::process::exit(1);
}
return Err(e.into());
}
};
let ctx = Arc::new(ServerCtx {
socket,
zone_map: build_zone_map(&config.zones)?,
cache: RwLock::new(DnsCache::new(
config.cache.max_entries,
config.cache.min_ttl,
config.cache.max_ttl,
)),
refreshing: Mutex::new(std::collections::HashSet::new()),
stats: Mutex::new(ServerStats::new()),
overrides: RwLock::new(OverrideStore::new()),
blocklist: RwLock::new(blocklist),
query_log: Mutex::new(QueryLog::new(1000)),
services: Mutex::new(service_store),
lan_peers: Mutex::new(crate::lan::PeerStore::new(config.lan.peer_timeout_secs)),
forwarding_rules,
upstream_pool: Mutex::new(pool),
upstream_auto,
upstream_port: config.upstream.port,
lan_ip: Mutex::new(crate::lan::detect_lan_ip().unwrap_or(std::net::Ipv4Addr::LOCALHOST)),
timeout: Duration::from_millis(config.upstream.timeout_ms),
hedge_delay: Duration::from_millis(config.upstream.hedge_ms),
proxy_tld_suffix: if config.proxy.tld.is_empty() {
String::new()
} else {
format!(".{}", config.proxy.tld)
},
proxy_tld: config.proxy.tld.clone(),
lan_enabled: config.lan.enabled,
config_path: resolved_config_path,
config_found,
config_dir: crate::config_dir(),
data_dir: resolved_data_dir,
tls_config: initial_tls,
upstream_mode: resolved_mode,
root_hints,
srtt: std::sync::RwLock::new(crate::srtt::SrttCache::new(config.upstream.srtt)),
inflight: std::sync::Mutex::new(std::collections::HashMap::new()),
dnssec_enabled: config.dnssec.enabled,
dnssec_strict: config.dnssec.strict,
health_meta,
ca_pem,
mobile_enabled: config.mobile.enabled,
mobile_port: config.mobile.port,
});
let zone_count: usize = ctx.zone_map.values().map(|m| m.len()).sum();
// Build banner rows, then size the box to fit the longest value
let api_url = format!("http://localhost:{}", api_port);
let proxy_label = if config.proxy.enabled {
if config.proxy.tls_port > 0 {
Some(format!(
"http://:{} https://:{}",
config.proxy.port, config.proxy.tls_port
))
} else {
Some(format!(
"http://*.{} on :{}",
config.proxy.tld, config.proxy.port
))
}
} else {
None
};
let config_label = if ctx.config_found {
ctx.config_path.clone()
} else {
format!("{} (defaults)", ctx.config_path)
};
let data_label = ctx.data_dir.display().to_string();
let services_label = ctx.config_dir.join("services.json").display().to_string();
// label (10) + value + padding (2) = inner width; minimum 40 for the title row
let val_w = [
config.server.bind_addr.len(),
api_url.len(),
upstream_label.len(),
config_label.len(),
data_label.len(),
services_label.len(),
]
.into_iter()
.chain(proxy_label.as_ref().map(|s| s.len()))
.max()
.unwrap_or(30);
let w = (val_w + 12).max(42); // 10 label + 2 padding, min 42 for title
let o = "\x1b[38;5;166m"; // orange borders (256-color, ~192,98,58)
let g = "\x1b[38;5;101m"; // khaki/olive labels (256-color, ~107,124,78)
let d = "\x1b[38;5;138m"; // warm grey labels (256-color, ~163,152,136)
let r = "\x1b[0m"; // reset
let b = "\x1b[1;38;5;166m"; // bold orange title (256-color)
let it = "\x1b[3;38;5;138m"; // italic warm grey subtitle
let bar_top = "".repeat(w);
let bar_mid = "".repeat(w);
let row = |label: &str, color: &str, value: &str| {
eprintln!(
"{o}{r} {color}{:<9}{r} {:<vw$}{o}{r}",
label,
value,
vw = w - 12
);
};
// Title row: center within the box
let title = format!(
"{b}NUMA{r} {it}DNS that governs itself{r} {d}v{}{r}",
env!("CARGO_PKG_VERSION")
);
// The title contains ANSI codes; visible length is ~38 chars. Pad to fill the box.
let title_visible_len = 4 + 2 + 24 + 2 + 1 + env!("CARGO_PKG_VERSION").len() + 1;
let title_pad = w.saturating_sub(title_visible_len);
eprintln!("\n{o}{bar_top}{r}");
eprint!("{o}{r} {title}");
eprintln!("{}{o}{r}", " ".repeat(title_pad));
eprintln!("{o}{bar_top}{r}");
row("DNS", g, &config.server.bind_addr);
row("API", g, &api_url);
row("Dashboard", g, &api_url);
row(
"Upstream",
g,
if ctx.upstream_mode == crate::config::UpstreamMode::Recursive {
"recursive (root hints)"
} else {
&upstream_label
},
);
row("Zones", g, &format!("{} records", zone_count));
row(
"Cache",
g,
&format!("max {} entries", config.cache.max_entries),
);
if !config.cache.warm.is_empty() {
row("Warm", g, &format!("{} domains", config.cache.warm.len()));
}
row(
"Blocking",
g,
&if config.blocking.enabled {
format!("{} lists", config.blocking.lists.len())
} else {
"disabled".to_string()
},
);
if let Some(ref label) = proxy_label {
row("Proxy", g, label);
if config.proxy.bind_addr == "127.0.0.1" {
let y = "\x1b[33m"; // yellow
row(
"",
y,
&format!(
"⚠ proxy on 127.0.0.1 — .{} not LAN reachable",
config.proxy.tld
),
);
}
}
if config.dot.enabled {
row("DoT", g, &format!("tls://:{}", config.dot.port));
}
if doh_enabled {
row(
"DoH",
g,
&format!("https://:{}/dns-query", config.proxy.tls_port),
);
}
if config.lan.enabled {
row("LAN", g, "mDNS (_numa._tcp.local)");
}
if !ctx.forwarding_rules.is_empty() {
row(
"Routing",
g,
&format!("{} conditional rules", ctx.forwarding_rules.len()),
);
}
eprintln!("{o}{bar_mid}{r}");
row("Config", d, &config_label);
row("Data", d, &data_label);
row("Services", d, &services_label);
eprintln!("{o}{bar_top}{r}\n");
info!(
"numa listening on {}, upstream {}, {} zone records, cache max {}, API on port {}",
config.server.bind_addr, upstream_label, zone_count, config.cache.max_entries, api_port,
);
// Download blocklists on startup
let blocklist_lists = config.blocking.lists.clone();
let refresh_hours = config.blocking.refresh_hours;
if config.blocking.enabled && !blocklist_lists.is_empty() {
let bl_ctx = Arc::clone(&ctx);
let bl_lists = blocklist_lists.clone();
tokio::spawn(async move {
load_blocklists(&bl_ctx, &bl_lists).await;
// Periodic refresh
let mut interval = tokio::time::interval(Duration::from_secs(refresh_hours * 3600));
interval.tick().await; // skip immediate tick
loop {
interval.tick().await;
info!("refreshing blocklists...");
load_blocklists(&bl_ctx, &bl_lists).await;
}
});
}
// Prime TLD cache (recursive mode only)
if ctx.upstream_mode == crate::config::UpstreamMode::Recursive {
let prime_ctx = Arc::clone(&ctx);
let prime_tlds = config.upstream.prime_tlds;
tokio::spawn(async move {
crate::recursive::prime_tld_cache(
&prime_ctx.cache,
&prime_ctx.root_hints,
&prime_tlds,
&prime_ctx.srtt,
)
.await;
});
}
// Spawn cache warming for user-configured domains
if !config.cache.warm.is_empty() {
let warm_ctx = Arc::clone(&ctx);
let warm_domains = config.cache.warm.clone();
tokio::spawn(async move {
cache_warm_loop(warm_ctx, warm_domains).await;
});
}
// Spawn DoH connection keepalive — prevents idle TLS teardown
{
let keepalive_ctx = Arc::clone(&ctx);
tokio::spawn(async move {
doh_keepalive_loop(keepalive_ctx).await;
});
}
// Spawn HTTP API server
let api_ctx = Arc::clone(&ctx);
let api_addr: SocketAddr = format!("{}:{}", config.server.api_bind_addr, api_port).parse()?;
tokio::spawn(async move {
let app = crate::api::router(api_ctx);
let listener = tokio::net::TcpListener::bind(api_addr).await.unwrap();
info!("HTTP API listening on {}", api_addr);
axum::serve(listener, app).await.unwrap();
});
// Spawn Mobile API listener (read-only subset for iOS/Android companion
// apps, LAN-bound by default so phones can reach it). Only idempotent
// GETs; no state-mutating routes are exposed here regardless of
// the main API's bind address.
if config.mobile.enabled {
let mobile_ctx = Arc::clone(&ctx);
let mobile_bind = config.mobile.bind_addr.clone();
let mobile_port = config.mobile.port;
tokio::spawn(async move {
if let Err(e) = crate::mobile_api::start(mobile_ctx, mobile_bind, mobile_port).await {
log::warn!("Mobile API listener failed: {}", e);
}
});
}
let proxy_bind: std::net::Ipv4Addr = config
.proxy
.bind_addr
.parse()
.unwrap_or(std::net::Ipv4Addr::LOCALHOST);
// Spawn HTTP reverse proxy for .numa domains
if config.proxy.enabled {
let proxy_ctx = Arc::clone(&ctx);
let proxy_port = config.proxy.port;
tokio::spawn(async move {
crate::proxy::start_proxy(proxy_ctx, proxy_port, proxy_bind).await;
});
}
// Spawn HTTPS reverse proxy with TLS termination
if config.proxy.enabled && config.proxy.tls_port > 0 && ctx.tls_config.is_some() {
let proxy_ctx = Arc::clone(&ctx);
let tls_port = config.proxy.tls_port;
tokio::spawn(async move {
crate::proxy::start_proxy_tls(proxy_ctx, tls_port, proxy_bind).await;
});
}
// Spawn network change watcher (upstream re-detection, LAN IP update, peer flush)
{
let watch_ctx = Arc::clone(&ctx);
tokio::spawn(async move {
network_watch_loop(watch_ctx).await;
});
}
// Spawn LAN service discovery
if config.lan.enabled {
let lan_ctx = Arc::clone(&ctx);
let lan_config = config.lan.clone();
tokio::spawn(async move {
crate::lan::start_lan_discovery(lan_ctx, &lan_config).await;
});
}
// Spawn DNS-over-TLS listener (RFC 7858)
if config.dot.enabled {
let dot_ctx = Arc::clone(&ctx);
let dot_config = config.dot.clone();
tokio::spawn(async move {
crate::dot::start_dot(dot_ctx, &dot_config).await;
});
}
// UDP DNS listener
#[allow(clippy::infinite_loop)]
loop {
let mut buffer = BytePacketBuffer::new();
let (len, src_addr) = match ctx.socket.recv_from(&mut buffer.buf).await {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {
// Windows delivers ICMP port-unreachable as ConnectionReset on UDP sockets
continue;
}
Err(e) => return Err(e.into()),
};
let ctx = Arc::clone(&ctx);
tokio::spawn(async move {
if let Err(e) = handle_query(buffer, len, src_addr, &ctx, Transport::Udp).await {
error!("{} | HANDLER ERROR | {}", src_addr, e);
}
});
}
}
async fn network_watch_loop(ctx: Arc<ServerCtx>) {
let mut tick: u64 = 0;
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.tick().await; // skip immediate tick
loop {
interval.tick().await;
tick += 1;
let mut changed = false;
// Check LAN IP change (every 5s — cheap, one UDP socket call)
if let Some(new_ip) = crate::lan::detect_lan_ip() {
let mut current_ip = ctx.lan_ip.lock().unwrap();
if new_ip != *current_ip {
info!("LAN IP changed: {} → {}", current_ip, new_ip);
*current_ip = new_ip;
changed = true;
crate::recursive::reset_udp_state();
}
}
// Re-detect upstream every 30s or on LAN IP change (auto-detect only)
if ctx.upstream_auto && (changed || tick.is_multiple_of(6)) {
let dns_info = crate::system_dns::discover_system_dns();
let new_addr = dns_info
.default_upstream
.or_else(crate::system_dns::detect_dhcp_dns)
.unwrap_or_else(|| QUAD9_IP.to_string());
let mut pool = ctx.upstream_pool.lock().unwrap();
if pool.maybe_update_primary(&new_addr, ctx.upstream_port) {
info!("upstream changed → {}", pool.label());
changed = true;
}
}
// Flush stale LAN peers on any network change
if changed {
ctx.lan_peers.lock().unwrap().clear();
info!("flushed LAN peers after network change");
}
// Re-probe UDP every 5 minutes when disabled
if tick.is_multiple_of(60) {
crate::recursive::probe_udp(&ctx.root_hints).await;
}
}
}
async fn load_blocklists(ctx: &ServerCtx, lists: &[String]) {
let downloaded = download_blocklists(lists).await;
// Parse outside the lock to avoid blocking DNS queries during parse (~100ms)
let mut all_domains = std::collections::HashSet::new();
let mut sources = Vec::new();
for (source, text) in &downloaded {
let domains = parse_blocklist(text);
info!("blocklist: {} domains from {}", domains.len(), source);
all_domains.extend(domains);
sources.push(source.clone());
}
let total = all_domains.len();
// Swap under lock — sub-microsecond
ctx.blocklist
.write()
.unwrap()
.swap_domains(all_domains, sources);
info!(
"blocking enabled: {} unique domains from {} lists",
total,
downloaded.len()
);
}
async fn warm_domain(ctx: &ServerCtx, domain: &str) {
for qtype in [
crate::question::QueryType::A,
crate::question::QueryType::AAAA,
] {
crate::ctx::refresh_entry(ctx, domain, qtype).await;
}
}
async fn doh_keepalive_loop(ctx: Arc<ServerCtx>) {
let mut interval = tokio::time::interval(Duration::from_secs(25));
interval.tick().await; // skip first immediate tick
loop {
interval.tick().await;
let pool = ctx.upstream_pool.lock().unwrap().clone();
if let Some(upstream) = pool.preferred() {
crate::forward::keepalive_doh(upstream).await;
}
}
}
async fn cache_warm_loop(ctx: Arc<ServerCtx>, domains: Vec<String>) {
tokio::time::sleep(Duration::from_secs(2)).await;
for domain in &domains {
warm_domain(&ctx, domain).await;
}
info!("cache warm: {} domains resolved at startup", domains.len());
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
for domain in &domains {
let refresh = ctx.cache.read().unwrap().needs_warm(domain);
if refresh {
warm_domain(&ctx, domain).await;
}
}
}
}

View File

@@ -60,7 +60,7 @@ pub async fn run() -> Result<(), String> {
if !api_reachable {
eprintln!();
eprintln!(
" \x1b[1;38;2;192;98;58mNuma\x1b[0m — mobile API is not reachable on port {}.",
" \x1b[1;38;5;166mNuma\x1b[0m — mobile API is not reachable on port {}.",
SETUP_PORT
);
eprintln!();
@@ -77,7 +77,7 @@ pub async fn run() -> Result<(), String> {
let qr = render_qr(&url)?;
eprintln!();
eprintln!(" \x1b[1;38;2;192;98;58mNuma Phone Setup\x1b[0m");
eprintln!(" \x1b[1;38;5;166mNuma Phone Setup\x1b[0m");
eprintln!();
eprintln!(" Profile URL: \x1b[36m{}\x1b[0m", url);
eprintln!();

View File

@@ -90,6 +90,7 @@ fn linux_rss() -> usize {
pub struct ServerStats {
queries_total: u64,
queries_forwarded: u64,
queries_upstream: u64,
queries_recursive: u64,
queries_coalesced: u64,
queries_cached: u64,
@@ -127,7 +128,10 @@ impl Transport {
pub enum QueryPath {
Local,
Cached,
/// Matched a `[[forwarding]]` suffix rule.
Forwarded,
/// Resolved via the default `[upstream]` pool (no suffix match).
Upstream,
Recursive,
Coalesced,
Blocked,
@@ -141,6 +145,7 @@ impl QueryPath {
QueryPath::Local => "LOCAL",
QueryPath::Cached => "CACHED",
QueryPath::Forwarded => "FORWARD",
QueryPath::Upstream => "UPSTREAM",
QueryPath::Recursive => "RECURSIVE",
QueryPath::Coalesced => "COALESCED",
QueryPath::Blocked => "BLOCKED",
@@ -156,6 +161,8 @@ impl QueryPath {
Some(QueryPath::Cached)
} else if s.eq_ignore_ascii_case("FORWARD") {
Some(QueryPath::Forwarded)
} else if s.eq_ignore_ascii_case("UPSTREAM") {
Some(QueryPath::Upstream)
} else if s.eq_ignore_ascii_case("RECURSIVE") {
Some(QueryPath::Recursive)
} else if s.eq_ignore_ascii_case("COALESCED") {
@@ -183,6 +190,7 @@ impl ServerStats {
ServerStats {
queries_total: 0,
queries_forwarded: 0,
queries_upstream: 0,
queries_recursive: 0,
queries_coalesced: 0,
queries_cached: 0,
@@ -204,6 +212,7 @@ impl ServerStats {
QueryPath::Local => self.queries_local += 1,
QueryPath::Cached => self.queries_cached += 1,
QueryPath::Forwarded => self.queries_forwarded += 1,
QueryPath::Upstream => self.queries_upstream += 1,
QueryPath::Recursive => self.queries_recursive += 1,
QueryPath::Coalesced => self.queries_coalesced += 1,
QueryPath::Blocked => self.queries_blocked += 1,
@@ -232,6 +241,7 @@ impl ServerStats {
uptime_secs: self.uptime_secs(),
total: self.queries_total,
forwarded: self.queries_forwarded,
upstream: self.queries_upstream,
recursive: self.queries_recursive,
coalesced: self.queries_coalesced,
cached: self.queries_cached,
@@ -253,10 +263,11 @@ impl ServerStats {
let secs = uptime.as_secs() % 60;
log::info!(
"STATS | uptime {}h{}m{}s | total {} | fwd {} | recursive {} | coalesced {} | cached {} | local {} | override {} | blocked {} | errors {}",
"STATS | uptime {}h{}m{}s | total {} | fwd {} | upstream {} | recursive {} | coalesced {} | cached {} | local {} | override {} | blocked {} | errors {}",
hours, mins, secs,
self.queries_total,
self.queries_forwarded,
self.queries_upstream,
self.queries_recursive,
self.queries_coalesced,
self.queries_cached,
@@ -272,6 +283,7 @@ pub struct StatsSnapshot {
pub uptime_secs: u64,
pub total: u64,
pub forwarded: u64,
pub upstream: u64,
pub recursive: u64,
pub coalesced: u64,
pub cached: u64,

View File

@@ -89,7 +89,7 @@ pub fn try_port53_advisory(bind_addr: &str, err: &std::io::Error) -> Option<Stri
),
_ => return None,
};
let o = "\x1b[1;38;2;192;98;58m"; // bold orange
let o = "\x1b[1;38;5;166m"; // bold orange
let r = "\x1b[0m";
Some(format!(
"
@@ -211,7 +211,7 @@ fn discover_macos() -> SystemDnsInfo {
}
// Sort longest suffix first for most-specific matching
rules.sort_by(|a, b| b.suffix.len().cmp(&a.suffix.len()));
rules.sort_by_key(|r| std::cmp::Reverse(r.suffix.len()));
for rule in &rules {
info!(
@@ -572,7 +572,7 @@ fn windows_backup_path() -> std::path::PathBuf {
#[cfg(windows)]
fn disable_dnscache() -> Result<bool, String> {
// Check if Dnscache is running (it holds port 53 at kernel level)
// Check if Dnscache is running (it can hold port 53)
let output = std::process::Command::new("sc")
.args(["query", "Dnscache"])
.output()
@@ -603,8 +603,16 @@ fn disable_dnscache() -> Result<bool, String> {
return Err("failed to disable Dnscache via registry (run as Administrator?)".into());
}
eprintln!(" Dnscache disabled. A reboot is required to free port 53.");
Ok(true)
// Dnscache is disabled for next boot. Check whether port 53 is
// actually blocked right now — on many Windows configurations
// Dnscache doesn't bind port 53 even while running.
let port_blocked = std::net::UdpSocket::bind("127.0.0.1:53").is_err();
if port_blocked {
eprintln!(" Dnscache disabled. A reboot is required to free port 53.");
} else {
eprintln!(" Dnscache disabled. Port 53 is free.");
}
Ok(port_blocked)
}
#[cfg(windows)]
@@ -671,6 +679,83 @@ fn install_windows() -> Result<(), String> {
std::fs::write(&path, json).map_err(|e| format!("failed to write backup: {}", e))?;
}
// On re-install, stop the running service first so the binary can be
// overwritten and port 53 is released for the Dnscache probe.
if is_service_registered() {
eprintln!(" Stopping existing service...");
stop_service_scm();
}
let needs_reboot = disable_dnscache()?;
// Copy the binary to a stable path under ProgramData and register it
// as a real Windows service (SCM-managed, boot-time, auto-restart).
let service_exe = install_service_binary()?;
register_service_scm(&service_exe)?;
if needs_reboot {
// Dnscache still holds port 53 until reboot. Do NOT redirect DNS
// yet — nothing is listening on 127.0.0.1:53, so redirecting now
// would kill DNS. The service will call redirect_dns_to_localhost()
// on its first startup after reboot.
} else {
redirect_dns_with_interfaces(&interfaces)?;
match start_service_scm() {
Ok(_) => eprintln!(" Service started."),
Err(e) => eprintln!(
" warning: service registered but could not start now: {}",
e
),
}
}
eprintln!();
if !has_useful_existing {
eprintln!(" Original DNS saved to {}", path.display());
}
eprintln!(" Run 'numa uninstall' to restore.\n");
if needs_reboot {
eprintln!(" *** Reboot required. Numa will start automatically. ***\n");
} else {
eprintln!(" Numa is running.\n");
}
print_recursive_hint();
Ok(())
}
/// Stable install location for the service binary. SCM keeps a handle to
/// this path; the user's Downloads folder (where `current_exe()` points at
/// install time) is not durable.
#[cfg(windows)]
fn windows_service_exe_path() -> std::path::PathBuf {
crate::data_dir().join("bin").join("numa.exe")
}
/// Run `sc.exe` with the given args and return its merged stdout/stderr on
/// failure. `sc` emits errors on stdout (not stderr) on Windows, so the
/// caller reads stdout to format a useful error.
#[cfg(windows)]
fn run_sc(args: &[&str]) -> Result<std::process::Output, String> {
let out = std::process::Command::new("sc")
.args(args)
.output()
.map_err(|e| format!("failed to run sc {}: {}", args.first().unwrap_or(&""), e))?;
Ok(out)
}
/// Point all active network interfaces at 127.0.0.1 so Numa handles DNS.
/// Called from the service on first boot after a reboot that freed Dnscache.
#[cfg(windows)]
pub fn redirect_dns_to_localhost() -> Result<(), String> {
let interfaces = get_windows_interfaces()?;
redirect_dns_with_interfaces(&interfaces)
}
#[cfg(windows)]
fn redirect_dns_with_interfaces(
interfaces: &std::collections::HashMap<String, WindowsInterfaceDns>,
) -> Result<(), String> {
for name in interfaces.keys() {
let status = std::process::Command::new("netsh")
.args([
@@ -695,63 +780,184 @@ fn install_windows() -> Result<(), String> {
);
}
}
let needs_reboot = disable_dnscache()?;
register_autostart();
eprintln!();
if !has_useful_existing {
eprintln!(" Original DNS saved to {}", path.display());
}
eprintln!(" Run 'numa uninstall' to restore.\n");
if needs_reboot {
eprintln!(" *** Reboot required. Numa will start automatically. ***\n");
} else {
eprintln!(" Numa will start automatically on next boot.\n");
}
print_recursive_hint();
Ok(())
}
/// Register numa to auto-start on boot via registry Run key.
/// Copy the currently-running binary to the service install location. SCM
/// keeps a handle to this path, so it must be stable across user sessions.
#[cfg(windows)]
fn register_autostart() {
let exe = std::env::current_exe()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| "numa".into());
let _ = std::process::Command::new("reg")
.args([
"add",
"HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run",
"/v",
"Numa",
"/t",
"REG_SZ",
"/d",
&exe,
"/f",
])
.status();
eprintln!(" Registered auto-start on boot.");
fn install_service_binary() -> Result<std::path::PathBuf, String> {
let src = std::env::current_exe().map_err(|e| format!("current_exe(): {}", e))?;
let dst = windows_service_exe_path();
if let Some(parent) = dst.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("failed to create {}: {}", parent.display(), e))?;
}
// Copy only if source and destination differ; running the binary from
// its install location is a supported (re-install) case.
if src != dst {
std::fs::copy(&src, &dst).map_err(|e| {
format!(
"failed to copy {} -> {}: {}",
src.display(),
dst.display(),
e
)
})?;
}
Ok(dst)
}
/// Remove numa auto-start registry key.
/// Remove the service binary on uninstall. Ignore failures — the service
/// is already deleted; a leftover file in ProgramData is not a hard error.
#[cfg(windows)]
fn remove_autostart() {
let _ = std::process::Command::new("reg")
.args([
"delete",
"HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run",
"/v",
"Numa",
"/f",
])
.status();
fn remove_service_binary() {
let _ = std::fs::remove_file(windows_service_exe_path());
}
/// Register numa with the Service Control Manager, boot-time auto-start,
/// LocalSystem context, with a failure policy of restart-after-5s.
#[cfg(windows)]
fn register_service_scm(exe: &std::path::Path) -> Result<(), String> {
let bin_path = format!("\"{}\" --service", exe.display());
let name = crate::windows_service::SERVICE_NAME;
// sc.exe uses a leading space as its `name= value` delimiter; the space
// after `=` is mandatory.
let create = run_sc(&[
"create",
name,
"binPath=",
&bin_path,
"DisplayName=",
"Numa DNS",
"start=",
"auto",
"obj=",
"LocalSystem",
])?;
if !create.status.success() {
let out = String::from_utf8_lossy(&create.stdout);
// "service already exists" is 1073 — treat as idempotent success.
if !out.contains("1073") {
return Err(format!("sc create failed: {}", out.trim()));
}
}
let _ = run_sc(&[
"description",
name,
"Self-sovereign DNS resolver (ad blocking, DoH/DoT, local zones).",
]);
// Restart on crash: 5s, 5s, 10s; reset failure counter after 60s.
let _ = run_sc(&[
"failure",
name,
"reset=",
"60",
"actions=",
"restart/5000/restart/5000/restart/10000",
]);
eprintln!(" Registered service '{}' (boot-time).", name);
Ok(())
}
/// Start the service. Safe to call on a freshly-registered service — SCM
/// will fail with 1056 ("already running") or 1058 ("disabled") and we
/// return the underlying error string rather than masking it.
#[cfg(windows)]
fn start_service_scm() -> Result<(), String> {
let out = run_sc(&["start", crate::windows_service::SERVICE_NAME])?;
if !out.status.success() {
let text = String::from_utf8_lossy(&out.stdout);
if text.contains("1056") {
return Ok(()); // already running
}
return Err(format!("sc start failed: {}", text.trim()));
}
Ok(())
}
/// Stop the service and wait for it to fully exit. Idempotent —
/// already-stopped or missing service is not an error.
#[cfg(windows)]
fn stop_service_scm() {
let name = crate::windows_service::SERVICE_NAME;
let _ = run_sc(&["stop", name]);
// Wait up to 10s for the service to reach STOPPED state so the
// binary file handle is released before we try to overwrite it.
for _ in 0..20 {
if let Ok(out) = run_sc(&["query", name]) {
let text = String::from_utf8_lossy(&out.stdout);
if text.contains("STOPPED") || text.contains("1060") {
return;
}
}
std::thread::sleep(std::time::Duration::from_millis(500));
}
eprintln!(" warning: service did not stop within 10s");
}
/// Remove the service from SCM. Idempotent — see `stop_service_scm`.
#[cfg(windows)]
fn delete_service_scm() {
if let Err(e) = run_sc(&["delete", crate::windows_service::SERVICE_NAME]) {
log::warn!("sc delete failed: {}", e);
}
}
/// Check whether the service is registered with SCM (regardless of state).
#[cfg(windows)]
fn is_service_registered() -> bool {
run_sc(&["query", crate::windows_service::SERVICE_NAME])
.map(|o| parse_sc_registered(o.status.success(), &String::from_utf8_lossy(&o.stdout)))
.unwrap_or(false)
}
/// Parse `sc query` output to determine if a service is registered.
/// Extracted for testability — the actual `sc` call is in `is_service_registered`.
#[cfg(any(windows, test))]
fn parse_sc_registered(exit_success: bool, stdout: &str) -> bool {
if exit_success {
return true;
}
// Error 1060 = "The specified service does not exist as an installed service."
!stdout.contains("1060")
}
/// Print service state from SCM.
#[cfg(windows)]
fn service_status_windows() -> Result<(), String> {
let out = run_sc(&["query", crate::windows_service::SERVICE_NAME])?;
let text = String::from_utf8_lossy(&out.stdout);
let display = parse_sc_state(&text);
eprintln!(" {}\n", display);
Ok(())
}
/// Parse the STATE line from `sc query` output. Returns a human-readable
/// string like "STATE : 4 RUNNING" or "Service is not installed."
#[cfg(any(windows, test))]
fn parse_sc_state(sc_output: &str) -> String {
if sc_output.contains("1060") {
return "Service is not installed.".to_string();
}
sc_output
.lines()
.find(|l| l.contains("STATE"))
.map(|l| l.trim().to_string())
.unwrap_or_else(|| "unknown".to_string())
}
#[cfg(windows)]
fn uninstall_windows() -> Result<(), String> {
remove_autostart();
// Stop + remove the service before touching DNS, so port 53 is released
// cleanly and the failure-restart policy doesn't resurrect it.
stop_service_scm();
delete_service_scm();
remove_service_binary();
let path = windows_backup_path();
let json = std::fs::read_to_string(&path)
.map_err(|e| format!("no backup found at {}: {}", path.display(), e))?;
@@ -1048,6 +1254,62 @@ pub fn install_service() -> Result<(), String> {
result
}
/// Start the service. If already installed, just starts it via the platform
/// service manager. If not installed, falls through to a full install.
pub fn start_service() -> Result<(), String> {
#[cfg(target_os = "macos")]
{
install_service()
}
#[cfg(target_os = "linux")]
{
install_service()
}
#[cfg(windows)]
{
if is_service_registered() {
start_service_scm()?;
eprintln!(" Service started.\n");
Ok(())
} else {
install_service()
}
}
#[cfg(not(any(target_os = "macos", target_os = "linux", windows)))]
{
Err("service start not supported on this OS".to_string())
}
}
/// Stop the service without uninstalling it.
pub fn stop_service() -> Result<(), String> {
#[cfg(target_os = "macos")]
{
uninstall_service()
}
#[cfg(target_os = "linux")]
{
uninstall_service()
}
#[cfg(windows)]
{
let out = run_sc(&["stop", crate::windows_service::SERVICE_NAME])?;
if !out.status.success() {
let text = String::from_utf8_lossy(&out.stdout);
// 1062 = not started, 1060 = does not exist
if !text.contains("1062") && !text.contains("1060") {
return Err(format!("sc stop failed: {}", text.trim()));
}
}
eprintln!(" Service stopped.\n");
Ok(())
}
#[cfg(not(any(target_os = "macos", target_os = "linux", windows)))]
{
Err("service stop not supported on this OS".to_string())
}
}
/// Uninstall the Numa system service.
pub fn uninstall_service() -> Result<(), String> {
let _ = untrust_ca();
@@ -1117,7 +1379,14 @@ pub fn restart_service() -> Result<(), String> {
eprintln!(" Service restarted → {}\n", version);
Ok(())
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
#[cfg(windows)]
{
stop_service_scm();
start_service_scm()?;
eprintln!(" Service restarted.\n");
Ok(())
}
#[cfg(not(any(target_os = "macos", target_os = "linux", windows)))]
{
Err("service restart not supported on this OS".to_string())
}
@@ -1133,7 +1402,11 @@ pub fn service_status() -> Result<(), String> {
{
service_status_linux()
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
#[cfg(windows)]
{
service_status_windows()
}
#[cfg(not(any(target_os = "macos", target_os = "linux", windows)))]
{
Err("service status not supported on this OS".to_string())
}
@@ -1867,4 +2140,57 @@ Wireless LAN adapter Wi-Fi:
let err = std::io::Error::from(std::io::ErrorKind::AddrInUse);
assert!(try_port53_advisory("not-an-address", &err).is_none());
}
#[test]
fn sc_query_running_service_is_registered() {
assert!(parse_sc_registered(true, ""));
}
#[test]
fn sc_query_stopped_service_is_registered() {
let output = "SERVICE_NAME: Numa\n TYPE: 10 WIN32_OWN\n STATE: 1 STOPPED\n";
assert!(parse_sc_registered(true, output));
}
#[test]
fn sc_query_missing_service_not_registered() {
let output = "[SC] EnumQueryServicesStatus:OpenService FAILED 1060:\n\nThe specified service does not exist as an installed service.\n";
assert!(!parse_sc_registered(false, output));
}
#[test]
fn sc_query_other_error_assumes_registered() {
// Permission denied or other errors — don't assume unregistered.
let output = "[SC] OpenService FAILED 5:\n\nAccess is denied.\n";
assert!(parse_sc_registered(false, output));
}
#[test]
fn parse_sc_state_running() {
let output = "SERVICE_NAME: Numa\n TYPE : 10 WIN32_OWN_PROCESS\n STATE : 4 RUNNING\n WIN32_EXIT_CODE : 0\n";
assert!(parse_sc_state(output).contains("RUNNING"));
}
#[test]
fn parse_sc_state_stopped() {
let output = "SERVICE_NAME: Numa\n TYPE : 10 WIN32_OWN_PROCESS\n STATE : 1 STOPPED\n";
assert!(parse_sc_state(output).contains("STOPPED"));
}
#[test]
fn parse_sc_state_not_installed() {
let output = "[SC] EnumQueryServicesStatus:OpenService FAILED 1060:\n\n";
assert_eq!(parse_sc_state(output), "Service is not installed.");
}
#[test]
fn parse_sc_state_empty_output() {
assert_eq!(parse_sc_state(""), "unknown");
}
#[cfg(windows)]
#[test]
fn windows_config_dir_equals_data_dir() {
assert_eq!(crate::config_dir(), crate::data_dir());
}
}

View File

@@ -49,7 +49,7 @@ pub fn try_data_dir_advisory(err: &crate::Error, data_dir: &Path) -> Option<Stri
if io_err.kind() != std::io::ErrorKind::PermissionDenied {
return None;
}
let o = "\x1b[1;38;2;192;98;58m";
let o = "\x1b[1;38;5;166m";
let r = "\x1b[0m";
Some(format!(
"

147
src/windows_service.rs Normal file
View File

@@ -0,0 +1,147 @@
//! Windows service wrapper.
//!
//! Lets the `numa.exe` binary act as a real Windows service registered with
//! the Service Control Manager (SCM). Invoked via `numa.exe --service` (the
//! form that `sc create … binPath=` uses).
//!
//! Interactive runs (`numa.exe`, `numa.exe run`, `numa.exe install`) do not
//! go through this module — they keep their existing console-attached
//! behaviour.
use std::ffi::OsString;
use std::sync::mpsc;
use std::time::Duration;
use windows_service::service::{
ServiceControl, ServiceControlAccept, ServiceExitCode, ServiceState, ServiceStatus, ServiceType,
};
use windows_service::service_control_handler::{self, ServiceControlHandlerResult};
use windows_service::{define_windows_service, service_dispatcher};
pub const SERVICE_NAME: &str = "Numa";
define_windows_service!(ffi_service_main, service_main);
/// Entry point the SCM hands control to after `StartServiceCtrlDispatcherW`.
/// Any panic here vanishes silently into the service host — log instead of
/// unwrapping.
fn service_main(_arguments: Vec<OsString>) {
if let Err(e) = run_service() {
log::error!("numa service exited with error: {:?}", e);
}
}
fn run_service() -> windows_service::Result<()> {
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>();
let event_handler = move |control_event| -> ServiceControlHandlerResult {
match control_event {
ServiceControl::Stop | ServiceControl::Shutdown => {
let _ = shutdown_tx.send(());
ServiceControlHandlerResult::NoError
}
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
_ => ServiceControlHandlerResult::NotImplemented,
}
};
let status_handle = service_control_handler::register(SERVICE_NAME, event_handler)?;
status_handle.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::Running,
controls_accepted: ServiceControlAccept::STOP | ServiceControlAccept::SHUTDOWN,
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None,
})?;
// Spin up a multi-threaded tokio runtime and run the server on it. A
// dedicated thread runs the runtime so this function can return cleanly
// once the SCM tells us to stop — we can't block the dispatcher thread
// forever without preventing graceful shutdown.
let config_path = service_config_path();
let (server_done_tx, server_done_rx) = mpsc::channel::<()>();
let server_thread = std::thread::spawn(move || {
let runtime = match tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::error!("failed to build tokio runtime: {}", e);
let _ = server_done_tx.send(());
return;
}
};
if let Err(e) = runtime.block_on(crate::serve::run(config_path)) {
log::error!("numa serve exited with error: {}", e);
}
let _ = server_done_tx.send(());
});
// Wait for the API to be ready, then ensure DNS points at localhost.
// On first boot after install (Dnscache was disabled, reboot freed
// port 53), the installer deferred the DNS redirect — do it now.
let api_up = (0..20).any(|i| {
if i > 0 {
std::thread::sleep(Duration::from_millis(500));
}
std::net::TcpStream::connect(("127.0.0.1", crate::config::DEFAULT_API_PORT)).is_ok()
});
if api_up {
if let Err(e) = crate::system_dns::redirect_dns_to_localhost() {
log::warn!("could not redirect DNS to localhost: {}", e);
}
} else {
log::error!("numa API did not start within 10s — DNS not redirected");
}
// Wait for either SCM stop or server termination.
loop {
if shutdown_rx.recv_timeout(Duration::from_millis(500)).is_ok() {
break;
}
if server_done_rx.try_recv().is_ok() {
break;
}
}
// The server's tokio runtime runs detached inside server_thread. Abandon
// it — the process is about to report Stopped and the SCM will terminate
// us if we linger. Future work: plumb a cancellation signal into
// serve::run() for a clean teardown of listeners and in-flight queries.
drop(server_thread);
status_handle.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::Stopped,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None,
})?;
Ok(())
}
/// Hand control to the SCM dispatcher. Blocks until the service stops.
/// Call only from the `--service` command path — interactive invocations
/// will hang here waiting for an SCM that isn't talking to them.
pub fn run_as_service() -> windows_service::Result<()> {
service_dispatcher::start(SERVICE_NAME, ffi_service_main)
}
/// Path to the config file used when running under SCM. SCM launches the
/// service with SYSTEM's working directory (usually `C:\Windows\System32`),
/// so a relative `numa.toml` lookup won't find anything meaningful.
fn service_config_path() -> String {
crate::data_dir()
.join("numa.toml")
.to_string_lossy()
.into_owned()
}