Files
WTFnet/crates/wtfnet-dnsleak/src/sensor.rs
2026-01-17 19:07:10 +08:00

401 lines
12 KiB
Rust

use crate::classify::{classify_dns_query, ClassifiedEvent};
use crate::report::LeakTransport;
use crate::DnsLeakError;
use std::collections::HashSet;
use std::net::IpAddr;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::debug;
use wtfnet_platform::FlowProtocol;
use crate::LeakWatchOptions;
#[cfg(feature = "pcap")]
use pnet::datalink::{self, Channel, Config as DatalinkConfig};
#[cfg(feature = "pcap")]
use std::sync::mpsc;
#[cfg(feature = "pcap")]
const OPEN_IFACE_TIMEOUT_MS: u64 = 700;
#[cfg(feature = "pcap")]
const FRAME_RECV_TIMEOUT_MS: u64 = 200;
#[cfg(not(feature = "pcap"))]
pub async fn capture_events(_options: &LeakWatchOptions) -> Result<Vec<ClassifiedEvent>, DnsLeakError> {
Err(DnsLeakError::NotSupported(
"dns leak watch requires pcap feature".to_string(),
))
}
#[cfg(feature = "pcap")]
pub async fn capture_events(options: &LeakWatchOptions) -> Result<Vec<ClassifiedEvent>, DnsLeakError> {
let options = options.clone();
let iface_list = datalink::interfaces();
let candidates = format_iface_list(&iface_list);
let select_budget_ms = (iface_list.len().max(1) as u64).saturating_mul(OPEN_IFACE_TIMEOUT_MS);
let timeout_ms = options
.duration_ms
.saturating_add(select_budget_ms)
.saturating_add(2000);
let handle = tokio::task::spawn_blocking(move || capture_events_blocking(options));
match tokio::time::timeout(Duration::from_millis(timeout_ms), handle).await {
Ok(joined) => joined.map_err(|err| DnsLeakError::Io(err.to_string()))?,
Err(_) => {
return Err(DnsLeakError::Io(
format!(
"capture timed out waiting for interface; candidates: {candidates}"
),
))
}
}
}
#[derive(Debug, Clone)]
pub struct IfaceDiag {
pub name: String,
pub open_ok: bool,
pub error: String,
}
#[cfg(not(feature = "pcap"))]
pub fn iface_diagnostics() -> Result<Vec<IfaceDiag>, DnsLeakError> {
Err(DnsLeakError::NotSupported(
"dns leak watch requires pcap feature".to_string(),
))
}
#[cfg(feature = "pcap")]
pub fn iface_diagnostics() -> Result<Vec<IfaceDiag>, DnsLeakError> {
let interfaces = datalink::interfaces();
let mut config = DatalinkConfig::default();
config.read_timeout = Some(Duration::from_millis(500));
let mut out = Vec::new();
for iface in interfaces {
let result = match open_channel_with_timeout(iface.clone(), &config) {
Ok((_iface, _rx)) => IfaceDiag {
name: iface.name,
open_ok: true,
error: "-".to_string(),
},
Err(err) => IfaceDiag {
name: iface.name,
open_ok: false,
error: err,
},
};
out.push(result);
}
Ok(out)
}
#[cfg(feature = "pcap")]
fn capture_events_blocking(options: LeakWatchOptions) -> Result<Vec<ClassifiedEvent>, DnsLeakError> {
use pnet::packet::ethernet::{EtherTypes, EthernetPacket};
use pnet::packet::Packet;
let mut config = DatalinkConfig::default();
config.read_timeout = Some(Duration::from_millis(500));
let (iface, mut rx) = select_interface(options.iface.as_deref(), &config)?;
let local_ips = iface.ips.iter().map(|ip| ip.ip()).collect::<Vec<_>>();
let iface_name = iface.name.clone();
let (frame_tx, frame_rx) = mpsc::channel();
std::thread::spawn(move || loop {
match rx.next() {
Ok(frame) => {
if frame_tx.send(frame.to_vec()).is_err() {
break;
}
}
Err(_) => continue,
}
});
let deadline = Instant::now() + Duration::from_millis(options.duration_ms);
let mut events = Vec::new();
let mut seen = HashSet::new();
while Instant::now() < deadline {
let frame = match frame_rx.recv_timeout(Duration::from_millis(FRAME_RECV_TIMEOUT_MS)) {
Ok(frame) => frame,
Err(_) => continue,
};
let ethernet = match EthernetPacket::new(&frame) {
Some(packet) => packet,
None => continue,
};
let event = match ethernet.get_ethertype() {
EtherTypes::Ipv4 => parse_ipv4(
ethernet.payload(),
&local_ips,
&iface_name,
),
EtherTypes::Ipv6 => parse_ipv6(
ethernet.payload(),
&local_ips,
&iface_name,
),
_ => None,
};
if let Some(event) = event {
let key = format!(
"{:?}|{}|{}|{}|{}",
event.transport, event.src_ip, event.src_port, event.dst_ip, event.dst_port
);
if seen.insert(key) {
debug!(
transport = ?event.transport,
src_ip = %event.src_ip,
src_port = event.src_port,
dst_ip = %event.dst_ip,
dst_port = event.dst_port,
"dns leak event"
);
events.push(event);
}
}
}
Ok(events)
}
#[cfg(feature = "pcap")]
fn parse_ipv4(
payload: &[u8],
local_ips: &[IpAddr],
iface_name: &str,
) -> Option<ClassifiedEvent> {
use pnet::packet::ip::IpNextHeaderProtocols;
use pnet::packet::ipv4::Ipv4Packet;
use pnet::packet::Packet;
let ipv4 = Ipv4Packet::new(payload)?;
let src = IpAddr::V4(ipv4.get_source());
if !local_ips.contains(&src) {
return None;
}
match ipv4.get_next_level_protocol() {
IpNextHeaderProtocols::Udp => parse_udp(
src,
IpAddr::V4(ipv4.get_destination()),
ipv4.payload(),
iface_name,
),
IpNextHeaderProtocols::Tcp => parse_tcp(
src,
IpAddr::V4(ipv4.get_destination()),
ipv4.payload(),
iface_name,
),
_ => None,
}
}
#[cfg(feature = "pcap")]
fn parse_ipv6(
payload: &[u8],
local_ips: &[IpAddr],
iface_name: &str,
) -> Option<ClassifiedEvent> {
use pnet::packet::ip::IpNextHeaderProtocols;
use pnet::packet::ipv6::Ipv6Packet;
use pnet::packet::Packet;
let ipv6 = Ipv6Packet::new(payload)?;
let src = IpAddr::V6(ipv6.get_source());
if !local_ips.contains(&src) {
return None;
}
match ipv6.get_next_header() {
IpNextHeaderProtocols::Udp => parse_udp(
src,
IpAddr::V6(ipv6.get_destination()),
ipv6.payload(),
iface_name,
),
IpNextHeaderProtocols::Tcp => parse_tcp(
src,
IpAddr::V6(ipv6.get_destination()),
ipv6.payload(),
iface_name,
),
_ => None,
}
}
#[cfg(feature = "pcap")]
fn parse_udp(
src_ip: IpAddr,
dst_ip: IpAddr,
payload: &[u8],
iface_name: &str,
) -> Option<ClassifiedEvent> {
use pnet::packet::udp::UdpPacket;
use pnet::packet::Packet;
let udp = UdpPacket::new(payload)?;
let dst_port = udp.get_destination();
if dst_port != 53 {
return None;
}
let (qname, qtype, rcode) = classify_dns_query(udp.payload())?;
Some(ClassifiedEvent {
timestamp_ms: now_ms(),
proto: FlowProtocol::Udp,
src_ip,
src_port: udp.get_source(),
dst_ip,
dst_port,
iface_name: Some(iface_name.to_string()),
transport: LeakTransport::Udp53,
qname: Some(qname),
qtype: Some(qtype),
rcode: Some(rcode),
})
}
#[cfg(feature = "pcap")]
fn parse_tcp(
src_ip: IpAddr,
dst_ip: IpAddr,
payload: &[u8],
iface_name: &str,
) -> Option<ClassifiedEvent> {
use pnet::packet::tcp::TcpPacket;
let tcp = TcpPacket::new(payload)?;
let dst_port = tcp.get_destination();
let transport = match dst_port {
53 => LeakTransport::Tcp53,
853 => LeakTransport::Dot,
_ => return None,
};
Some(ClassifiedEvent {
timestamp_ms: now_ms(),
proto: FlowProtocol::Tcp,
src_ip,
src_port: tcp.get_source(),
dst_ip,
dst_port,
iface_name: Some(iface_name.to_string()),
transport,
qname: None,
qtype: None,
rcode: None,
})
}
#[cfg(feature = "pcap")]
fn select_interface(
name: Option<&str>,
config: &DatalinkConfig,
) -> Result<(datalink::NetworkInterface, Box<dyn datalink::DataLinkReceiver>), DnsLeakError> {
let interfaces = datalink::interfaces();
if let Some(name) = name {
debug!("dns leak iface pick: requested={name}");
let iface = interfaces
.iter()
.find(|iface| iface.name == name)
.cloned()
.ok_or_else(|| {
DnsLeakError::Io(format!(
"interface '{name}' not found; candidates: {}",
format_iface_list(&interfaces)
))
})?;
return open_channel_with_timeout(iface, config).map_err(|err| {
DnsLeakError::Io(format!(
"failed to open capture on interface ({err}); candidates: {}",
format_iface_list(&interfaces)
))
});
}
let ordered = order_interfaces(&interfaces);
for iface in ordered.iter() {
debug!("dns leak iface pick: try={}", iface.name);
if let Ok(channel) = open_channel_with_timeout(iface.clone(), config) {
return Ok(channel);
}
}
Err(DnsLeakError::Io(format!(
"no suitable interface found; candidates: {}",
format_iface_list(&interfaces)
)))
}
#[cfg(feature = "pcap")]
fn open_channel_with_timeout(
iface: datalink::NetworkInterface,
config: &DatalinkConfig,
) -> Result<(datalink::NetworkInterface, Box<dyn datalink::DataLinkReceiver>), String> {
let (tx, rx) = mpsc::channel();
let config = config.clone();
std::thread::spawn(move || {
let result = match datalink::channel(&iface, config) {
Ok(Channel::Ethernet(_, rx)) => Ok(rx),
Ok(_) => Err("unsupported channel".to_string()),
Err(err) => Err(err.to_string()),
};
let _ = tx.send((iface, result));
});
let timeout = Duration::from_millis(OPEN_IFACE_TIMEOUT_MS);
match rx.recv_timeout(timeout) {
Ok((iface, Ok(rx))) => Ok((iface, rx)),
Ok((_iface, Err(err))) => Err(err),
Err(_) => Err("timeout opening capture".to_string()),
}
}
#[cfg(feature = "pcap")]
fn is_named_fallback(name: &str) -> bool {
let name = name.to_ascii_lowercase();
name.contains("wlan")
|| name.contains("wifi")
|| name.contains("wi-fi")
|| name.contains("ethernet")
|| name.contains("eth")
|| name.contains("lan")
}
#[cfg(feature = "pcap")]
fn order_interfaces(
interfaces: &[datalink::NetworkInterface],
) -> Vec<datalink::NetworkInterface> {
let mut preferred = Vec::new();
let mut others = Vec::new();
for iface in interfaces.iter() {
if iface.is_loopback() {
continue;
}
if is_named_fallback(&iface.name) || !iface.ips.is_empty() {
preferred.push(iface.clone());
} else {
others.push(iface.clone());
}
}
preferred.extend(others);
if preferred.is_empty() {
interfaces.to_vec()
} else {
preferred
}
}
#[cfg(feature = "pcap")]
fn format_iface_list(interfaces: &[datalink::NetworkInterface]) -> String {
if interfaces.is_empty() {
return "-".to_string();
}
interfaces
.iter()
.map(|iface| iface.name.as_str())
.collect::<Vec<_>>()
.join(", ")
}
#[cfg(feature = "pcap")]
fn now_ms() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
}