|
|
@@ -3,11 +3,29 @@ import requests
|
|
|
import base64
|
|
|
import json
|
|
|
import yaml
|
|
|
-from ruamel.yaml import YAML
|
|
|
+
|
|
|
+try:
|
|
|
+ from ruamel.yaml import YAML
|
|
|
+except ImportError:
|
|
|
+ try:
|
|
|
+ import ruamel.yaml
|
|
|
+
|
|
|
+ YAML = ruamel.yaml.YAML
|
|
|
+ except Exception as e:
|
|
|
+ print('Error: ruamel.yaml not installed. Please install it with: pip install ruamel.yaml')
|
|
|
+ raise e
|
|
|
from urllib.parse import urlparse, parse_qs, unquote
|
|
|
from collections import OrderedDict
|
|
|
import re
|
|
|
|
|
|
+
|
|
|
+def is_valid_ws_path(path):
|
|
|
+ # All '%' must be followed by exactly two hex digits
|
|
|
+ # Regex: '%' not followed by two hex digits is invalid
|
|
|
+ invalid = re.search(r'%($|[^0-9A-Fa-f]{0,2}|[0-9A-Fa-f]($|[^0-9A-Fa-f]))', path)
|
|
|
+ return not invalid
|
|
|
+
|
|
|
+
|
|
|
# ---------- UTILITIES ----------
|
|
|
def download_subscription(sub_url):
|
|
|
resp = requests.get(sub_url)
|
|
|
@@ -163,7 +181,7 @@ def parse_trojan(uri):
|
|
|
params = parse_qs(url.query)
|
|
|
sni = params.get('sni', [None])[0]
|
|
|
|
|
|
- # TLS options
|
|
|
+ # TLS options for Hysteria2
|
|
|
tls_opts = None
|
|
|
if sni:
|
|
|
tls_opts = {'server_name': sni}
|
|
|
@@ -367,10 +385,36 @@ def parse_proxy_line(line):
|
|
|
return parse_tuic(line)
|
|
|
elif line.startswith('wg://'):
|
|
|
return parse_wireguard(line)
|
|
|
+ elif line.startswith(('reality://', 'anytls://')):
|
|
|
+ print(f'[!] WARNING: New or future protocol detected in link: {line[:32]}...')
|
|
|
+ return None # Not yet implemented -- print warning
|
|
|
else:
|
|
|
+ if line.strip():
|
|
|
+ print(f'[!] WARNING: Unknown v2ray/vless/protocol line skipped: {line[:48]}...')
|
|
|
return None
|
|
|
|
|
|
|
|
|
+def validate_proxy(p):
|
|
|
+ # Block injection if domain is well-known public web service or parameters are missing
|
|
|
+ if not p:
|
|
|
+ return False
|
|
|
+ if not p.get('server') or not p.get('port') or not p.get('uuid', ''):
|
|
|
+ return False
|
|
|
+ # Block known public domains (e.g. speedtest.net, npmjs.com, google.com, etc.)
|
|
|
+ public_domains = {
|
|
|
+ 'www.speedtest.net', 'speedtest.net', 'npmjs.com', 'google.com', 'github.com', 'cloudflare.com',
|
|
|
+ 'facebook.com', 'twitter.com', 'spotify.com', 'youtube.com', 'apple.com', 'microsoft.com', 'instagram.com'
|
|
|
+ }
|
|
|
+ server_l = p['server'].lower()
|
|
|
+ if any(domain in server_l for domain in public_domains):
|
|
|
+ return False
|
|
|
+ # Optionally block .ir endpoints (for Iran direct/dns leaks)
|
|
|
+ if server_l.endswith('.ir'):
|
|
|
+ return False
|
|
|
+ # You may add extra filters here if needed
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
# --- CONFIG PARSING/RENDER ---
|
|
|
def read_yaml_file(yaml_path):
|
|
|
with open(yaml_path, 'r', encoding='utf-8') as f:
|
|
|
@@ -405,20 +449,27 @@ def ascii_name(name):
|
|
|
|
|
|
def update_clash_proxies(clash_cfg, proxies):
|
|
|
yaml_proxies = []
|
|
|
+ name_registry = {}
|
|
|
for p in proxies:
|
|
|
clash_proxy = proxy_to_clash(p)
|
|
|
if clash_proxy:
|
|
|
+ orig_name = ascii_name(clash_proxy['name'])
|
|
|
+ # Ensure unique names for Clash
|
|
|
+ name = orig_name
|
|
|
+ i = 2
|
|
|
+ while name in name_registry:
|
|
|
+ name = f"{orig_name} #{i}"
|
|
|
+ i += 1
|
|
|
+ clash_proxy['name'] = name
|
|
|
+ name_registry[name] = True
|
|
|
yaml_proxies.append(clash_proxy)
|
|
|
-
|
|
|
- proxy_names = [ascii_name(p['name']) for p in yaml_proxies]
|
|
|
+ proxy_names = [p['name'] for p in yaml_proxies]
|
|
|
+ # ... (rest unchanged)
|
|
|
for p in yaml_proxies:
|
|
|
- p['name'] = ascii_name(p['name'])
|
|
|
-
|
|
|
- # Clear any existing proxies and groups
|
|
|
+ p['name'] = p['name']
|
|
|
+ # ... (rest unchanged)
|
|
|
clash_cfg['proxies'] = yaml_proxies
|
|
|
clash_cfg['proxy-groups'] = []
|
|
|
-
|
|
|
- # Create requested groups
|
|
|
auto_group = {
|
|
|
"name": "AUTO",
|
|
|
"type": "url-test",
|
|
|
@@ -507,7 +558,7 @@ def proxy_to_clash(proxy):
|
|
|
reality = proxy['tls_opts']['reality']
|
|
|
clash_proxy['reality-opts'] = {
|
|
|
'public-key': reality['public_key'],
|
|
|
- 'short-id': reality.get('short_id', ''),
|
|
|
+ 'short-id': reality.get('short_id', '')
|
|
|
}
|
|
|
if reality.get('fingerprint'):
|
|
|
clash_proxy['client-fingerprint'] = reality['fingerprint']
|
|
|
@@ -621,8 +672,25 @@ def update_singbox_outbounds(sj, proxies):
|
|
|
def proxy_to_singbox(proxy):
|
|
|
tag = ascii_name(proxy['name'])
|
|
|
|
|
|
+ net = proxy.get('network', 'tcp')
|
|
|
+ # If grpc network, change to tcp and add transport block if possible
|
|
|
+ if net == 'grpc':
|
|
|
+ net = 'tcp'
|
|
|
+ transport = {'type': 'grpc'}
|
|
|
+ if 'serviceName' in proxy:
|
|
|
+ transport['serviceName'] = proxy['serviceName']
|
|
|
+ proxy['transport'] = transport
|
|
|
+ if net not in ('tcp', 'ws', 'wss', 'grpc'):
|
|
|
+ print(f"[!] Skipping proxy with unsupported network type for sing-box: {net} ({proxy['name']})")
|
|
|
+ return None
|
|
|
+ # Now do ws path validation on path fields
|
|
|
+ if net in ('ws', 'wss') and proxy.get('transport') and proxy['transport'].get('path'):
|
|
|
+ path = str(proxy['transport'].get('path'))
|
|
|
+ if not is_valid_ws_path(path):
|
|
|
+ print(f"[FATAL] Skipping proxy with invalid WebSocket path: {path} ({proxy['name']})")
|
|
|
+ return None
|
|
|
if proxy['type'] == 'vmess':
|
|
|
- ws = proxy.get('network') in ('ws', 'wss')
|
|
|
+ ws = net in ('ws', 'wss')
|
|
|
out = OrderedDict([
|
|
|
('type', 'vmess'),
|
|
|
('tag', tag),
|
|
|
@@ -630,7 +698,7 @@ def proxy_to_singbox(proxy):
|
|
|
('server_port', proxy['port']),
|
|
|
('uuid', proxy['uuid']),
|
|
|
('alter_id', int(proxy.get('alterId', '0'))),
|
|
|
- ('network', 'tcp' if ws else proxy.get('network', 'tcp'))
|
|
|
+ ('network', 'tcp' if ws else net)
|
|
|
])
|
|
|
|
|
|
# TLS configuration
|
|
|
@@ -649,32 +717,26 @@ def proxy_to_singbox(proxy):
|
|
|
transport['path'] = proxy['transport']['path']
|
|
|
if transport:
|
|
|
out['transport'] = transport
|
|
|
-
|
|
|
return out
|
|
|
-
|
|
|
elif proxy['type'] == 'vless':
|
|
|
- ws = proxy.get('network') in ('ws', 'wss')
|
|
|
+ ws = net in ('ws', 'wss')
|
|
|
out = OrderedDict([
|
|
|
('type', 'vless'),
|
|
|
('tag', tag),
|
|
|
('server', proxy['server']),
|
|
|
('server_port', proxy['port']),
|
|
|
('uuid', proxy['uuid']),
|
|
|
- ('network', 'tcp' if ws else proxy.get('network', 'tcp'))
|
|
|
+ ('network', 'tcp' if ws else net)
|
|
|
])
|
|
|
-
|
|
|
if proxy.get('flow'):
|
|
|
out['flow'] = proxy['flow']
|
|
|
if proxy.get('encryption'):
|
|
|
out['encryption'] = proxy['encryption']
|
|
|
-
|
|
|
# TLS configuration
|
|
|
if proxy.get('tls'):
|
|
|
tls_config = {}
|
|
|
if proxy.get('tls_opts', {}).get('server_name'):
|
|
|
tls_config['server_name'] = proxy['tls_opts']['server_name']
|
|
|
-
|
|
|
- # Reality configuration
|
|
|
if proxy.get('tls_opts', {}).get('reality'):
|
|
|
reality = proxy['tls_opts']['reality']
|
|
|
tls_config['reality'] = {
|
|
|
@@ -682,9 +744,7 @@ def proxy_to_singbox(proxy):
|
|
|
'public_key': reality['public_key'],
|
|
|
'short_id': reality.get('short_id', '')
|
|
|
}
|
|
|
-
|
|
|
out['tls'] = tls_config
|
|
|
-
|
|
|
# Transport configuration
|
|
|
if ws and proxy.get('transport'):
|
|
|
transport = {}
|
|
|
@@ -793,9 +853,14 @@ if __name__ == '__main__':
|
|
|
proxies = []
|
|
|
for line in lines:
|
|
|
px = parse_proxy_line(line)
|
|
|
- if px:
|
|
|
+ if validate_proxy(px):
|
|
|
proxies.append(px)
|
|
|
|
|
|
+ # NEW: Warn and halt if no valid proxies!
|
|
|
+ if not proxies:
|
|
|
+ print('[FATAL] No valid proxies remain after filtering subscription. Check your sub or filtering policy!')
|
|
|
+ sys.exit(2)
|
|
|
+
|
|
|
print(f"[+] Parsed proxies: {len(proxies)}")
|
|
|
|
|
|
# Show protocol distribution
|