mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-05-27 23:49:35 +00:00
refactor(outbound): probe via xray burstObservatory instead of SOCKS round-trip
Replace the HTTP-mode outbound test that spun up a SOCKS inbound and ran an httptrace'd request from the Go client with a probe-only xray config: burstObservatory probes the target outbound directly and the result is read from xray's /debug/vars metrics endpoint. The probe lives inside xray, so the measured delay and failure reasons reflect what xray itself sees over the real proxy chain. Drops the DNS/Connect/TLS/TTFB breakdown (and statusCode) since the observatory snapshot only exposes total delay; the frontend popover is updated accordingly.
This commit is contained in:
@@ -101,10 +101,10 @@ function showSecurity(security?: string): boolean {
|
||||
return security === 'tls' || security === 'reality';
|
||||
}
|
||||
|
||||
function hasBreakdown(r: { endpoints?: unknown[]; ttfbMs?: number; tlsMs?: number; connectMs?: number; dnsMs?: number; statusCode?: number; error?: string } | null | undefined): boolean {
|
||||
function hasBreakdown(r: { endpoints?: unknown[]; error?: string } | null | undefined): boolean {
|
||||
if (!r) return false;
|
||||
if (r.endpoints?.length) return true;
|
||||
return !!(r.ttfbMs || r.tlsMs || r.connectMs || r.dnsMs || r.statusCode || r.error);
|
||||
return !!r.error;
|
||||
}
|
||||
|
||||
export default function OutboundsTab({
|
||||
@@ -335,11 +335,6 @@ export default function OutboundsTab({
|
||||
</div>
|
||||
{hasBreakdown(r) && (
|
||||
<>
|
||||
{r.ttfbMs ? <div>TTFB: {r.ttfbMs} ms</div> : null}
|
||||
{r.tlsMs ? <div>TLS: {r.tlsMs} ms</div> : null}
|
||||
{r.connectMs ? <div>Connect: {r.connectMs} ms</div> : null}
|
||||
{r.dnsMs ? <div>DNS: {r.dnsMs} ms</div> : null}
|
||||
{r.statusCode ? <div>HTTP {r.statusCode}</div> : null}
|
||||
{(r.endpoints || []).map((ep) => (
|
||||
<div key={ep.address} className="endpoint-row">
|
||||
<span className={ep.success ? 'dot-ok' : 'dot-fail'}>●</span>
|
||||
|
||||
@@ -54,11 +54,6 @@ export const OutboundTestResultSchema = z.object({
|
||||
delay: z.number().optional(),
|
||||
error: z.string().optional(),
|
||||
mode: z.string().optional(),
|
||||
ttfbMs: z.number().optional(),
|
||||
tlsMs: z.number().optional(),
|
||||
connectMs: z.number().optional(),
|
||||
dnsMs: z.number().optional(),
|
||||
statusCode: z.number().optional(),
|
||||
endpoints: z
|
||||
.array(
|
||||
z.object({
|
||||
|
||||
@@ -1,15 +1,10 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptrace"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -123,21 +118,14 @@ func (s *OutboundService) ResetOutboundTraffic(tag string) error {
|
||||
}
|
||||
|
||||
// TestOutboundResult represents the result of testing an outbound.
|
||||
// Delay/timing fields are in milliseconds. Endpoints is only populated for
|
||||
// TCP-mode probes; the HTTP-mode timing breakdown lives in DNSMs/ConnectMs/
|
||||
// TLSMs/TTFBMs (any of these can be 0 if the underlying step was skipped —
|
||||
// e.g. a non-TLS target leaves TLSMs at 0).
|
||||
// Delay is in milliseconds. Endpoints is only populated for TCP-mode
|
||||
// probes; HTTP mode reports the round-trip delay measured by xray's
|
||||
// burstObservatory probe.
|
||||
type TestOutboundResult struct {
|
||||
Success bool `json:"success"`
|
||||
Delay int64 `json:"delay"`
|
||||
Error string `json:"error,omitempty"`
|
||||
StatusCode int `json:"statusCode,omitempty"`
|
||||
Mode string `json:"mode,omitempty"`
|
||||
|
||||
DNSMs int64 `json:"dnsMs,omitempty"`
|
||||
ConnectMs int64 `json:"connectMs,omitempty"`
|
||||
TLSMs int64 `json:"tlsMs,omitempty"`
|
||||
TTFBMs int64 `json:"ttfbMs,omitempty"`
|
||||
Success bool `json:"success"`
|
||||
Delay int64 `json:"delay"`
|
||||
Error string `json:"error,omitempty"`
|
||||
Mode string `json:"mode,omitempty"`
|
||||
|
||||
Endpoints []TestEndpointResult `json:"endpoints,omitempty"`
|
||||
}
|
||||
@@ -370,6 +358,13 @@ func numAsInt(v any) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// testOutboundHTTP spins up a temporary xray instance whose only job is
|
||||
// to run a burstObservatory probe against the target outbound, then polls
|
||||
// xray's metrics /debug/vars endpoint until that outbound is reported
|
||||
// alive (success) or the deadline expires (failure). The probe lives
|
||||
// inside xray, so the measured delay and any failure reason reflect what
|
||||
// xray itself sees over the real proxy chain — no SOCKS round-trip on
|
||||
// the client side.
|
||||
func (s *OutboundService) testOutboundHTTP(outboundJSON string, testURL string, allOutboundsJSON string) (*TestOutboundResult, error) {
|
||||
if testURL == "" {
|
||||
testURL = "https://www.google.com/generate_204"
|
||||
@@ -406,12 +401,12 @@ func (s *OutboundService) testOutboundHTTP(outboundJSON string, testURL string,
|
||||
allOutbounds = []any{testOutbound}
|
||||
}
|
||||
|
||||
testPort, err := findAvailablePort()
|
||||
metricsPort, err := findAvailablePort()
|
||||
if err != nil {
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Failed to find available port: %v", err)}, nil
|
||||
}
|
||||
|
||||
testConfig := s.createTestConfig(outboundTag, allOutbounds, testPort)
|
||||
testConfig := s.createTestConfig(outboundTag, allOutbounds, metricsPort, testURL)
|
||||
|
||||
testConfigPath, err := createTestConfigPath()
|
||||
if err != nil {
|
||||
@@ -430,12 +425,12 @@ func (s *OutboundService) testOutboundHTTP(outboundJSON string, testURL string,
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Failed to start test xray instance: %v", err)}, nil
|
||||
}
|
||||
|
||||
if err := waitForPort(testPort, 3*time.Second); err != nil {
|
||||
if err := waitForPort(metricsPort, 5*time.Second); err != nil {
|
||||
if !testProcess.IsRunning() {
|
||||
result := testProcess.GetResult()
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Xray process exited: %s", result)}, nil
|
||||
}
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Xray failed to start listening: %v", err)}, nil
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Xray failed to start metrics listener: %v", err)}, nil
|
||||
}
|
||||
|
||||
if !testProcess.IsRunning() {
|
||||
@@ -443,22 +438,15 @@ func (s *OutboundService) testOutboundHTTP(outboundJSON string, testURL string,
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Xray process exited: %s", result)}, nil
|
||||
}
|
||||
|
||||
return s.testConnection(testPort, testURL)
|
||||
return pollObservatoryResult(testProcess, metricsPort, outboundTag, 12*time.Second), nil
|
||||
}
|
||||
|
||||
// createTestConfig creates a test config by copying all outbounds unchanged and adding
|
||||
// only the test inbound (SOCKS) and a route rule that sends traffic to the given outbound tag.
|
||||
func (s *OutboundService) createTestConfig(outboundTag string, allOutbounds []any, testPort int) *xray.Config {
|
||||
// Test inbound (SOCKS proxy) - only addition to inbounds
|
||||
testInbound := xray.InboundConfig{
|
||||
Tag: "test-inbound",
|
||||
Listen: json_util.RawMessage(`"127.0.0.1"`),
|
||||
Port: testPort,
|
||||
Protocol: "socks",
|
||||
Settings: json_util.RawMessage(`{"auth":"noauth","udp":true}`),
|
||||
}
|
||||
|
||||
// Outbounds: copy all, but set noKernelTun=true for WireGuard outbounds
|
||||
// createTestConfig builds a probe-only xray config: the original outbounds
|
||||
// are kept as-is so dialerProxy chains still resolve, a burstObservatory
|
||||
// is wired to probe the target tag, and a metrics listener exposes the
|
||||
// observatory snapshot via /debug/vars. No inbound or routing rules are
|
||||
// needed — burstObservatory issues the probe traffic itself.
|
||||
func (s *OutboundService) createTestConfig(outboundTag string, allOutbounds []any, metricsPort int, probeURL string) *xray.Config {
|
||||
processedOutbounds := make([]any, len(allOutbounds))
|
||||
for i, ob := range allOutbounds {
|
||||
outbound, ok := ob.(map[string]any)
|
||||
@@ -467,35 +455,37 @@ func (s *OutboundService) createTestConfig(outboundTag string, allOutbounds []an
|
||||
continue
|
||||
}
|
||||
if protocol, ok := outbound["protocol"].(string); ok && protocol == "wireguard" {
|
||||
// Set noKernelTun to true for WireGuard outbounds
|
||||
if settings, ok := outbound["settings"].(map[string]any); ok {
|
||||
settings["noKernelTun"] = true
|
||||
} else {
|
||||
// Create settings if it doesn't exist
|
||||
outbound["settings"] = map[string]any{
|
||||
"noKernelTun": true,
|
||||
}
|
||||
outbound["settings"] = map[string]any{"noKernelTun": true}
|
||||
}
|
||||
}
|
||||
processedOutbounds[i] = outbound
|
||||
}
|
||||
outboundsJSON, _ := json.Marshal(processedOutbounds)
|
||||
|
||||
// Create routing rule to route all traffic through test outbound
|
||||
routingRules := []map[string]any{
|
||||
{
|
||||
"type": "field",
|
||||
"outboundTag": outboundTag,
|
||||
"network": "tcp,udp",
|
||||
},
|
||||
}
|
||||
|
||||
routingJSON, _ := json.Marshal(map[string]any{
|
||||
"domainStrategy": "AsIs",
|
||||
"rules": routingRules,
|
||||
"rules": []any{},
|
||||
})
|
||||
|
||||
burstObservatoryJSON, _ := json.Marshal(map[string]any{
|
||||
"subjectSelector": []string{outboundTag},
|
||||
"pingConfig": map[string]any{
|
||||
"destination": probeURL,
|
||||
"interval": "1s",
|
||||
"connectivity": "",
|
||||
"timeout": "5s",
|
||||
"samplingCount": 1,
|
||||
},
|
||||
})
|
||||
|
||||
metricsJSON, _ := json.Marshal(map[string]any{
|
||||
"tag": "test-metrics",
|
||||
"listen": fmt.Sprintf("127.0.0.1:%d", metricsPort),
|
||||
})
|
||||
|
||||
// Disable logging for test process to avoid creating orphaned log files
|
||||
logConfig := map[string]any{
|
||||
"loglevel": "warning",
|
||||
"access": "none",
|
||||
@@ -504,107 +494,92 @@ func (s *OutboundService) createTestConfig(outboundTag string, allOutbounds []an
|
||||
}
|
||||
logJSON, _ := json.Marshal(logConfig)
|
||||
|
||||
// Create minimal config
|
||||
cfg := &xray.Config{
|
||||
LogConfig: json_util.RawMessage(logJSON),
|
||||
InboundConfigs: []xray.InboundConfig{
|
||||
testInbound,
|
||||
},
|
||||
OutboundConfigs: json_util.RawMessage(string(outboundsJSON)),
|
||||
RouterConfig: json_util.RawMessage(string(routingJSON)),
|
||||
Policy: json_util.RawMessage(`{}`),
|
||||
Stats: json_util.RawMessage(`{}`),
|
||||
LogConfig: json_util.RawMessage(logJSON),
|
||||
InboundConfigs: []xray.InboundConfig{},
|
||||
OutboundConfigs: json_util.RawMessage(string(outboundsJSON)),
|
||||
RouterConfig: json_util.RawMessage(string(routingJSON)),
|
||||
Policy: json_util.RawMessage(`{}`),
|
||||
Stats: json_util.RawMessage(`{}`),
|
||||
BurstObservatory: json_util.RawMessage(string(burstObservatoryJSON)),
|
||||
Metrics: json_util.RawMessage(string(metricsJSON)),
|
||||
}
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
||||
// testConnection runs the actual HTTP probe through the local SOCKS proxy.
|
||||
// A warmup request seeds xray's DNS cache / handshake; then a fresh
|
||||
// transport runs the measured request so httptrace sees a real cold
|
||||
// connection and reports DNS/Connect/TLS/TTFB. Note that DNS and Connect
|
||||
// reflect *client → SOCKS-on-loopback*, not the remote target — those
|
||||
// happen inside xray and aren't visible to net/http. TLS and TTFB are
|
||||
// the meaningful breakdown values for a SOCKS-proxied HTTPS probe.
|
||||
func (s *OutboundService) testConnection(proxyPort int, testURL string) (*TestOutboundResult, error) {
|
||||
proxyURLStr := fmt.Sprintf("socks5://127.0.0.1:%d", proxyPort)
|
||||
proxyURLParsed, err := url.Parse(proxyURLStr)
|
||||
if err != nil {
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Invalid proxy URL: %v", err)}, nil
|
||||
// observatoryEntry mirrors the per-outbound shape published by xray's
|
||||
// observatory under /debug/vars.
|
||||
type observatoryEntry struct {
|
||||
Alive bool `json:"alive"`
|
||||
Delay int64 `json:"delay"`
|
||||
LastSeenTime int64 `json:"last_seen_time"`
|
||||
LastTryTime int64 `json:"last_try_time"`
|
||||
OutboundTag string `json:"outbound_tag"`
|
||||
}
|
||||
|
||||
// pollObservatoryResult repeatedly reads /debug/vars and returns as soon
|
||||
// as the target outbound reports alive=true. burstObservatory updates the
|
||||
// snapshot after each ping (interval=1s, timeout=5s), so a healthy
|
||||
// outbound usually surfaces within ~2s and the timeout caps the wait for
|
||||
// truly dead ones.
|
||||
func pollObservatoryResult(testProcess *xray.Process, metricsPort int, tag string, timeout time.Duration) *TestOutboundResult {
|
||||
url := fmt.Sprintf("http://127.0.0.1:%d/debug/vars", metricsPort)
|
||||
client := &http.Client{Timeout: 2 * time.Second}
|
||||
deadline := time.Now().Add(timeout)
|
||||
var lastEntry observatoryEntry
|
||||
var sawEntry bool
|
||||
for time.Now().Before(deadline) {
|
||||
if !testProcess.IsRunning() {
|
||||
result := testProcess.GetResult()
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Xray process exited: %s", result)}
|
||||
}
|
||||
entry, ok := fetchObservatoryEntry(client, url, tag)
|
||||
if ok {
|
||||
if entry.Alive {
|
||||
delay := entry.Delay
|
||||
if delay <= 0 {
|
||||
delay = 1
|
||||
}
|
||||
return &TestOutboundResult{Mode: "http", Success: true, Delay: delay}
|
||||
}
|
||||
lastEntry = entry
|
||||
sawEntry = true
|
||||
}
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
}
|
||||
|
||||
mkClient := func() *http.Client {
|
||||
return &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyURL(proxyURLParsed),
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 5 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 1,
|
||||
IdleConnTimeout: 1 * time.Second,
|
||||
DisableCompression: true,
|
||||
},
|
||||
msg := "Probe timed out — outbound did not become reachable"
|
||||
if sawEntry && lastEntry.LastTryTime > 0 {
|
||||
msg = fmt.Sprintf("All probes failed (last attempt %ds ago)", time.Now().Unix()-lastEntry.LastTryTime)
|
||||
}
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: msg}
|
||||
}
|
||||
|
||||
func fetchObservatoryEntry(client *http.Client, url, tag string) (observatoryEntry, bool) {
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return observatoryEntry{}, false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return observatoryEntry{}, false
|
||||
}
|
||||
var payload struct {
|
||||
Observatory map[string]observatoryEntry `json:"observatory"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
|
||||
return observatoryEntry{}, false
|
||||
}
|
||||
if entry, ok := payload.Observatory[tag]; ok {
|
||||
return entry, true
|
||||
}
|
||||
for _, entry := range payload.Observatory {
|
||||
if entry.OutboundTag == tag {
|
||||
return entry, true
|
||||
}
|
||||
}
|
||||
|
||||
warmup := mkClient()
|
||||
warmupResp, err := warmup.Get(testURL)
|
||||
if err != nil {
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Request failed: %v", err)}, nil
|
||||
}
|
||||
io.Copy(io.Discard, warmupResp.Body)
|
||||
warmupResp.Body.Close()
|
||||
warmup.CloseIdleConnections()
|
||||
|
||||
var dnsStart, dnsDone, connectStart, connectDone, tlsStart, tlsDone, firstByte time.Time
|
||||
trace := &httptrace.ClientTrace{
|
||||
DNSStart: func(_ httptrace.DNSStartInfo) { dnsStart = time.Now() },
|
||||
DNSDone: func(_ httptrace.DNSDoneInfo) { dnsDone = time.Now() },
|
||||
ConnectStart: func(_, _ string) { connectStart = time.Now() },
|
||||
ConnectDone: func(_, _ string, _ error) { connectDone = time.Now() },
|
||||
TLSHandshakeStart: func() { tlsStart = time.Now() },
|
||||
TLSHandshakeDone: func(_ tls.ConnectionState, _ error) { tlsDone = time.Now() },
|
||||
GotFirstResponseByte: func() { firstByte = time.Now() },
|
||||
}
|
||||
|
||||
client := mkClient()
|
||||
defer client.CloseIdleConnections()
|
||||
ctx := httptrace.WithClientTrace(context.Background(), trace)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, testURL, nil)
|
||||
if err != nil {
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Request build failed: %v", err)}, nil
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
resp, err := client.Do(req)
|
||||
delay := time.Since(startTime).Milliseconds()
|
||||
if err != nil {
|
||||
return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Request failed: %v", err)}, nil
|
||||
}
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
|
||||
out := &TestOutboundResult{
|
||||
Mode: "http",
|
||||
Success: true,
|
||||
Delay: delay,
|
||||
StatusCode: resp.StatusCode,
|
||||
}
|
||||
if !dnsStart.IsZero() && !dnsDone.IsZero() {
|
||||
out.DNSMs = dnsDone.Sub(dnsStart).Milliseconds()
|
||||
}
|
||||
if !connectStart.IsZero() && !connectDone.IsZero() {
|
||||
out.ConnectMs = connectDone.Sub(connectStart).Milliseconds()
|
||||
}
|
||||
if !tlsStart.IsZero() && !tlsDone.IsZero() {
|
||||
out.TLSMs = tlsDone.Sub(tlsStart).Milliseconds()
|
||||
}
|
||||
if !firstByte.IsZero() {
|
||||
out.TTFBMs = firstByte.Sub(startTime).Milliseconds()
|
||||
}
|
||||
return out, nil
|
||||
return observatoryEntry{}, false
|
||||
}
|
||||
|
||||
// waitForPort polls until the given TCP port is accepting connections or the timeout expires.
|
||||
|
||||
Reference in New Issue
Block a user