Add: per-hop and rdns for probe trace

This commit is contained in:
DaZuo0122
2026-01-17 12:51:41 +08:00
parent 7e87edb411
commit c538e31174
9 changed files with 809 additions and 73 deletions

View File

@@ -238,6 +238,8 @@ struct ProbeTraceArgs {
target: String,
#[arg(long, default_value_t = 30)]
max_hops: u8,
#[arg(long, default_value_t = 3)]
per_hop: u32,
#[arg(long, default_value_t = 800)]
timeout_ms: u64,
#[arg(long)]
@@ -245,6 +247,8 @@ struct ProbeTraceArgs {
#[arg(long, default_value_t = 33434)]
port: u16,
#[arg(long)]
rdns: bool,
#[arg(long)]
no_geoip: bool,
}
@@ -1076,9 +1080,25 @@ async fn handle_probe_trace(cli: &Cli, args: ProbeTraceArgs) -> i32 {
};
let result = if args.udp {
wtfnet_probe::udp_trace(&host, port, args.max_hops, args.timeout_ms).await
wtfnet_probe::udp_trace(
&host,
port,
args.max_hops,
args.timeout_ms,
args.per_hop,
args.rdns,
)
.await
} else {
wtfnet_probe::tcp_trace(&host, port, args.max_hops, args.timeout_ms).await
wtfnet_probe::tcp_trace(
&host,
port,
args.max_hops,
args.timeout_ms,
args.per_hop,
args.rdns,
)
.await
};
match result {
@@ -1088,7 +1108,30 @@ async fn handle_probe_trace(cli: &Cli, args: ProbeTraceArgs) -> i32 {
}
if cli.json {
let meta = Meta::new("wtfnet", env!("CARGO_PKG_VERSION"), false);
let command = CommandInfo::new("probe trace", vec![args.target]);
let mut command_args = vec![args.target];
if args.udp {
command_args.push("--udp".to_string());
}
if args.rdns {
command_args.push("--rdns".to_string());
}
if args.per_hop != 3 {
command_args.push("--per-hop".to_string());
command_args.push(args.per_hop.to_string());
}
if args.max_hops != 30 {
command_args.push("--max-hops".to_string());
command_args.push(args.max_hops.to_string());
}
if args.timeout_ms != 800 {
command_args.push("--timeout-ms".to_string());
command_args.push(args.timeout_ms.to_string());
}
if args.udp && args.port != 33434 {
command_args.push("--port".to_string());
command_args.push(args.port.to_string());
}
let command = CommandInfo::new("probe trace", command_args);
let envelope = CommandEnvelope::new(meta, command, report);
emit_json(cli, &envelope)
} else {
@@ -1102,17 +1145,30 @@ async fn handle_probe_trace(cli: &Cli, args: ProbeTraceArgs) -> i32 {
}
for hop in report.hops {
let geoip = hop.geoip.as_ref().map(format_geoip);
let addr = hop.addr.unwrap_or_else(|| "*".to_string());
let rtt = if let (Some(min), Some(avg), Some(max)) =
(hop.min_ms, hop.avg_ms, hop.max_ms)
{
format!("{min}/{avg:.1}/{max}ms")
} else {
"-".to_string()
};
let rdns = hop
.rdns
.as_ref()
.map(|value| format!(" rdns={value}"))
.unwrap_or_default();
let note = hop
.note
.as_ref()
.map(|value| format!(" note={value}"))
.unwrap_or_default();
let geoip = geoip
.map(|value| format!(" geoip={value}"))
.unwrap_or_default();
println!(
"ttl={} addr={} rtt={}ms {}{}",
hop.ttl,
hop.addr.unwrap_or_else(|| "*".to_string()),
hop.rtt_ms
.map(|v| v.to_string())
.unwrap_or_else(|| "-".to_string()),
hop.note.unwrap_or_default(),
geoip
.map(|value| format!(" geoip={value}"))
.unwrap_or_default()
"ttl={} addr={} loss={:.1}% rtt(min/avg/max)={}{}{}{}",
hop.ttl, addr, hop.loss_pct, rtt, rdns, note, geoip
);
}
ExitKind::Ok.code()

View File

@@ -15,3 +15,4 @@ libc = "0.2"
tokio-socks = "0.5"
url = "2"
tracing = "0.1"
hickory-resolver = { version = "0.24", features = ["system-config"] }

View File

@@ -13,10 +13,14 @@ use pnet::transport::{
use std::os::unix::io::AsRawFd;
use serde::{Deserialize, Serialize};
use socket2::{Domain, Protocol, Socket, Type};
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
#[cfg(unix)]
use std::mem::size_of_val;
use std::time::{Duration, Instant};
use hickory_resolver::config::{ResolverConfig, ResolverOpts};
use hickory_resolver::system_conf::read_system_conf;
use hickory_resolver::TokioAsyncResolver;
use thiserror::Error;
use tokio::net::{TcpStream, lookup_host};
use tokio::time::timeout;
@@ -93,6 +97,12 @@ pub struct TraceHop {
pub ttl: u8,
pub addr: Option<String>,
pub rtt_ms: Option<u128>,
pub rtt_samples: Vec<Option<u128>>,
pub min_ms: Option<u128>,
pub avg_ms: Option<f64>,
pub max_ms: Option<u128>,
pub loss_pct: f64,
pub rdns: Option<String>,
pub note: Option<String>,
pub geoip: Option<GeoIpRecord>,
}
@@ -105,6 +115,8 @@ pub struct TraceReport {
pub port: u16,
pub max_hops: u8,
pub timeout_ms: u64,
pub per_hop: u32,
pub rdns: bool,
pub protocol: String,
pub hops: Vec<TraceHop>,
}
@@ -308,6 +320,8 @@ pub async fn tcp_trace(
port: u16,
max_hops: u8,
timeout_ms: u64,
per_hop: u32,
rdns: bool,
) -> Result<TraceReport, ProbeError> {
debug!(
target,
@@ -321,37 +335,81 @@ pub async fn tcp_trace(
let socket_addr = SocketAddr::new(addr, port);
let timeout_dur = Duration::from_millis(timeout_ms);
let mut hops = Vec::new();
let mut rdns_lookup = if rdns {
Some(ReverseDns::new(timeout_dur)?)
} else {
None
};
for ttl in 1..=max_hops {
let addr = socket_addr;
let start = Instant::now();
let result =
tokio::task::spawn_blocking(move || tcp_connect_with_ttl(addr, ttl, timeout_dur))
.await
.map_err(|err| ProbeError::Io(err.to_string()))?;
debug!(ttl, per_hop, "probe tcp trace hop start");
let mut samples = Vec::new();
let mut last_error = None;
for _ in 0..per_hop.max(1) {
let addr = socket_addr;
let start = Instant::now();
let result =
tokio::task::spawn_blocking(move || tcp_connect_with_ttl(addr, ttl, timeout_dur))
.await
.map_err(|err| ProbeError::Io(err.to_string()))?;
match result {
Ok(()) => {
let rtt = start.elapsed().as_millis();
hops.push(TraceHop {
ttl,
addr: Some(socket_addr.ip().to_string()),
rtt_ms: Some(rtt),
note: None,
geoip: None,
});
break;
match result {
Ok(()) => {
let rtt = start.elapsed().as_millis();
debug!(ttl, rtt_ms = rtt, "probe tcp trace hop reply");
samples.push(Some(rtt));
}
Err(err) => {
let message = err.to_string();
debug!(ttl, error = %message, "probe tcp trace hop error");
last_error = Some(message);
samples.push(None);
}
}
Err(err) => {
let rtt = start.elapsed().as_millis();
hops.push(TraceHop {
ttl,
addr: None,
rtt_ms: Some(rtt),
note: Some(err.to_string()),
geoip: None,
});
}
let (min_ms, avg_ms, max_ms, loss_pct) = stats_from_samples(&samples);
let rtt_ms = avg_ms.map(|value| value.round() as u128);
let rdns_name = if rdns {
if let Some(lookup) = rdns_lookup.as_mut() {
lookup.lookup(socket_addr.ip()).await
} else {
None
}
} else {
None
};
let note = if loss_pct >= 100.0 {
last_error
} else {
None
};
hops.push(TraceHop {
ttl,
addr: Some(socket_addr.ip().to_string()),
rtt_ms,
rtt_samples: samples,
min_ms,
avg_ms,
max_ms,
loss_pct,
rdns: rdns_name,
note,
geoip: None,
});
debug!(
ttl,
loss_pct,
min_ms = ?min_ms,
avg_ms = ?avg_ms,
max_ms = ?max_ms,
"probe tcp trace hop summary"
);
if loss_pct < 100.0 {
break;
}
}
@@ -362,6 +420,8 @@ pub async fn tcp_trace(
port,
max_hops,
timeout_ms,
per_hop,
rdns,
protocol: "tcp".to_string(),
hops,
})
@@ -372,6 +432,8 @@ pub async fn udp_trace(
port: u16,
max_hops: u8,
timeout_ms: u64,
per_hop: u32,
rdns: bool,
) -> Result<TraceReport, ProbeError> {
debug!(
target,
@@ -385,37 +447,102 @@ pub async fn udp_trace(
let timeout_dur = Duration::from_millis(timeout_ms);
let mut hops = Vec::new();
let mut rdns_lookup = if rdns {
Some(ReverseDns::new(timeout_dur)?)
} else {
None
};
for ttl in 1..=max_hops {
let addr = SocketAddr::new(addr, port);
let start = Instant::now();
let result = tokio::task::spawn_blocking(move || udp_trace_hop(addr, ttl, timeout_dur))
.await
.map_err(|err| ProbeError::Io(err.to_string()))?;
debug!(ttl, per_hop, "probe udp trace hop start");
let mut samples = Vec::new();
let mut hop_addr = None;
let mut reached_any = false;
let mut last_error = None;
let mut addr_set = HashSet::new();
match result {
Ok((hop_addr, reached)) => {
let rtt = start.elapsed().as_millis();
hops.push(TraceHop {
ttl,
addr: hop_addr.map(|ip| ip.to_string()),
rtt_ms: Some(rtt),
note: None,
geoip: None,
});
if reached {
break;
for _ in 0..per_hop.max(1) {
let addr = SocketAddr::new(addr, port);
let start = Instant::now();
let result = tokio::task::spawn_blocking(move || udp_trace_hop(addr, ttl, timeout_dur))
.await
.map_err(|err| ProbeError::Io(err.to_string()))?;
match result {
Ok((addr, reached)) => {
let rtt = start.elapsed().as_millis();
debug!(
ttl,
addr = ?addr,
rtt_ms = rtt,
reached,
"probe udp trace hop reply"
);
samples.push(Some(rtt));
if let Some(ip) = addr {
addr_set.insert(ip);
if hop_addr.is_none() {
hop_addr = Some(ip);
}
}
if reached {
reached_any = true;
}
}
Err(err) => {
let message = err.to_string();
debug!(ttl, error = %message, "probe udp trace hop error");
last_error = Some(message);
samples.push(None);
}
}
Err(err) => {
hops.push(TraceHop {
ttl,
addr: None,
rtt_ms: None,
note: Some(err.to_string()),
geoip: None,
});
}
let (min_ms, avg_ms, max_ms, loss_pct) = stats_from_samples(&samples);
let rtt_ms = avg_ms.map(|value| value.round() as u128);
let rdns_name = if rdns {
if let (Some(ip), Some(lookup)) = (hop_addr, rdns_lookup.as_mut()) {
lookup.lookup(ip).await
} else {
None
}
} else {
None
};
let note = if loss_pct >= 100.0 {
last_error
} else if addr_set.len() > 1 {
Some("multiple hop addresses".to_string())
} else {
None
};
hops.push(TraceHop {
ttl,
addr: hop_addr.map(|ip| ip.to_string()),
rtt_ms,
rtt_samples: samples,
min_ms,
avg_ms,
max_ms,
loss_pct,
rdns: rdns_name,
note,
geoip: None,
});
debug!(
ttl,
loss_pct,
min_ms = ?min_ms,
avg_ms = ?avg_ms,
max_ms = ?max_ms,
reached_any,
"probe udp trace hop summary"
);
if reached_any {
break;
}
}
@@ -426,6 +553,8 @@ pub async fn udp_trace(
port,
max_hops,
timeout_ms,
per_hop,
rdns,
protocol: "udp".to_string(),
hops,
})
@@ -458,6 +587,35 @@ fn build_summary(
}
}
fn stats_from_samples(
samples: &[Option<u128>],
) -> (Option<u128>, Option<f64>, Option<u128>, f64) {
let mut min = None;
let mut max = None;
let mut sum = 0u128;
let mut received = 0u32;
for sample in samples {
if let Some(rtt) = sample {
received += 1;
min = Some(min.map_or(*rtt, |value: u128| value.min(*rtt)));
max = Some(max.map_or(*rtt, |value: u128| value.max(*rtt)));
sum += *rtt;
}
}
let sent = samples.len() as u32;
let loss_pct = if sent == 0 {
0.0
} else {
((sent - received) as f64 / sent as f64) * 100.0
};
let avg_ms = if received == 0 {
None
} else {
Some(sum as f64 / received as f64)
};
(min, avg_ms, max, loss_pct)
}
async fn resolve_one(target: &str) -> Result<IpAddr, ProbeError> {
let mut iter = lookup_host((target, 0))
.await
@@ -483,6 +641,40 @@ async fn resolve_one_prefer_ipv4(target: &str) -> Result<IpAddr, ProbeError> {
fallback.ok_or_else(|| ProbeError::Resolve("no address found".to_string()))
}
struct ReverseDns {
resolver: TokioAsyncResolver,
cache: HashMap<IpAddr, Option<String>>,
timeout: Duration,
}
impl ReverseDns {
fn new(timeout: Duration) -> Result<Self, ProbeError> {
let (config, opts) = match read_system_conf() {
Ok((config, opts)) => (config, opts),
Err(_) => (ResolverConfig::default(), ResolverOpts::default()),
};
let resolver = TokioAsyncResolver::tokio(config, opts);
Ok(Self {
resolver,
cache: HashMap::new(),
timeout,
})
}
async fn lookup(&mut self, ip: IpAddr) -> Option<String> {
if let Some(value) = self.cache.get(&ip) {
return value.clone();
}
let result = timeout(self.timeout, self.resolver.reverse_lookup(ip)).await;
let value = match result {
Ok(Ok(response)) => response.iter().next().map(|name| name.to_utf8()),
_ => None,
};
self.cache.insert(ip, value.clone());
value
}
}
struct Socks5Proxy {
addr: String,
remote_dns: bool,