| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887 |
- import sys
- import requests
- import base64
- import json
- import yaml
- try:
- from ruamel.yaml import YAML
- except ImportError:
- try:
- import ruamel.yaml
- YAML = ruamel.yaml.YAML
- except Exception as e:
- print('Error: ruamel.yaml not installed. Please install it with: pip install ruamel.yaml')
- raise e
- from urllib.parse import urlparse, parse_qs, unquote
- from collections import OrderedDict
- import re
- def is_valid_ws_path(path):
- # All '%' must be followed by exactly two hex digits
- # Regex: '%' not followed by two hex digits is invalid
- invalid = re.search(r'%($|[^0-9A-Fa-f]{0,2}|[0-9A-Fa-f]($|[^0-9A-Fa-f]))', path)
- return not invalid
- # ---------- UTILITIES ----------
- def download_subscription(sub_url):
- resp = requests.get(sub_url)
- resp.raise_for_status()
- text = resp.text.strip()
- # meta: some sub files are base64 encoded!
- try:
- if all(ord(c) < 128 for c in text) and not text.startswith(
- ('vmess://', 'vless://', 'trojan://', 'ss://', 'socks://', 'hy2://', 'hysteria2://', 'tuic://',
- 'wg://')):
- # Try decode base64 whole (often for SSR/SS)
- dec = base64.b64decode(text).decode('utf-8', errors='ignore')
- if dec.count('\n') > text.count('\n'):
- text = dec
- except Exception:
- pass
- return [line.strip() for line in text.splitlines() if line.strip()]
- # --- PROTOCOL PARSERS ---
- def parse_vmess(uri):
- # vmess://<base64json>
- payload = uri[8:]
- try:
- raw = base64.b64decode(payload + '=' * ((4 - len(payload) % 4) % 4)).decode('utf-8')
- data = json.loads(raw)
- cipher = data.get('cipher')
- # Fix: always set a valid cipher (Meta: chacha20-poly1305 or auto is safest)
- if not cipher or cipher.lower() not in ["auto", "chacha20-poly1305", "aes-128-gcm", "none"]:
- cipher = 'auto'
- net = data.get('net') or data.get('network', 'tcp')
- ws_opts = None
- tls_opts = None
- # Handle WebSocket transport
- if net in ['ws', 'wss']:
- path = data.get('path', '/')
- host = None
- try:
- addh = data.get('add', None)
- host = data.get('host', None) or data.get('Host', None)
- if host:
- if isinstance(host, list):
- host = host[0]
- else:
- if addh:
- host = addh
- except Exception:
- host = None
- ws_opts = {'type': 'ws', 'path': path if path else '/', 'host': host if host else None}
- # Handle TLS
- tls_enabled = (data.get('tls', 'none') == 'tls')
- if tls_enabled:
- sni = data.get('sni') or data.get('host') or data.get('add')
- tls_opts = {'server_name': sni} if sni else {}
- proxy = {
- 'type': 'vmess',
- 'server': data.get('add'),
- 'port': int(data.get('port', 0)),
- 'uuid': data.get('id'),
- 'alterId': data.get('aid', '0'),
- 'cipher': cipher,
- 'network': net,
- 'transport': ws_opts,
- 'tls': tls_enabled,
- 'tls_opts': tls_opts,
- 'name': data.get('ps') or f"vmess_{data.get('add')}",
- }
- return proxy
- except Exception:
- return None
- def parse_vless(uri):
- # vless://[uuid]@[host]:[port]?params#remark
- try:
- url = urlparse(uri)
- user = url.username
- server = url.hostname
- port = url.port
- params = parse_qs(url.query)
- tag = unquote(url.fragment) if url.fragment else f"vless_{server}"
- if not port or not user or not server:
- return None
- encryption = params.get('encryption', [None])[0]
- if encryption in (None, '', 'none'):
- encryption = None
- net = params.get('type', ['tcp'])[0]
- path = params.get('path', ['/'])[0]
- host = params.get('host', [server])[0]
- sni = params.get('sni', [None])[0]
- # Transport options
- ws_opts = None
- if net in ['ws', 'wss']:
- ws_opts = {'type': 'ws', 'path': path, 'host': host or server}
- # TLS options
- security = params.get('security', ['none'])[0]
- tls_enabled = security in ['tls', 'reality']
- tls_opts = None
- if tls_enabled:
- tls_opts = {}
- if sni:
- tls_opts['server_name'] = sni
- if security == 'reality':
- # Reality specific parameters
- public_key = params.get('pbk', [None])[0]
- short_id = params.get('sid', [None])[0]
- fp = params.get('fp', [None])[0]
- if public_key:
- tls_opts['reality'] = {
- 'public_key': public_key,
- 'short_id': short_id or '',
- 'fingerprint': fp or 'chrome'
- }
- return {
- 'type': 'vless',
- 'server': server,
- 'port': int(port),
- 'uuid': user,
- 'encryption': encryption,
- 'flow': params.get('flow', [''])[0],
- 'network': net,
- 'transport': ws_opts,
- 'tls': tls_enabled,
- 'tls_opts': tls_opts,
- 'name': tag,
- }
- except Exception:
- return None
- def parse_trojan(uri):
- # trojan://password@host:port?params#remark
- try:
- url = urlparse(uri)
- server = url.hostname
- port = url.port
- password = url.username
- tag = unquote(url.fragment) if url.fragment else f"trojan_{server}"
- if not port or not password or not server:
- return None
- params = parse_qs(url.query)
- sni = params.get('sni', [None])[0]
- # TLS options for Hysteria2
- tls_opts = None
- if sni:
- tls_opts = {'server_name': sni}
- return {
- 'type': 'trojan',
- 'server': server,
- 'port': int(port),
- 'password': password,
- 'tls_opts': tls_opts,
- 'name': tag
- }
- except Exception:
- return None
- def parse_ss(uri):
- # ss://[method:pass@host:port] or ss://base64#remark
- try:
- rest = uri[5:]
- if '@' in rest:
- if '#' in rest:
- rest, tag = rest.split('#', 1)
- tag = unquote(tag)
- else:
- tag = None
- auth, host_port = rest.split('@', 1)
- method, password = auth.split(':', 1)
- host, port = host_port.split(':', 1)
- if not port:
- return None
- else:
- if '#' in rest:
- main, tag = rest.split('#', 1)
- tag = unquote(tag)
- else:
- main = rest
- tag = None
- raw = base64.b64decode(main.split('?')[0] + '===').decode('utf-8')
- userinfo, host_port = raw.rsplit('@', 1)
- method, password = userinfo.split(':', 1)
- host, port = host_port.split(':', 1)
- if not port:
- return None
- return {
- 'type': 'ss',
- 'server': host,
- 'port': int(port),
- 'method': method,
- 'password': password,
- 'name': tag or f"ss_{host}"
- }
- except Exception:
- return None
- def parse_socks(uri):
- # socks://[username:password@]host:port
- try:
- url = urlparse(uri)
- username = url.username or ''
- password = url.password or ''
- server = url.hostname
- port = url.port
- tag = unquote(url.fragment) if url.fragment else f"socks_{server}"
- if not port or not server:
- return None
- return {
- 'type': 'socks',
- 'server': server,
- 'port': int(port),
- 'username': username,
- 'password': password,
- 'name': tag
- }
- except Exception:
- return None
- def parse_hysteria2(uri):
- # hy2://password@host:port?params#remark or hysteria2://password@host:port?params#remark
- try:
- if uri.startswith('hy2://'):
- uri = 'hysteria2://' + uri[6:]
- url = urlparse(uri)
- server = url.hostname
- port = url.port
- password = url.username
- tag = unquote(url.fragment) if url.fragment else f"hysteria2_{server}"
- if not port or not password or not server:
- return None
- params = parse_qs(url.query)
- sni = params.get('sni', [None])[0]
- # TLS options for Hysteria2
- tls_opts = None
- if sni:
- tls_opts = {'server_name': sni}
- return {
- 'type': 'hysteria2',
- 'server': server,
- 'port': int(port),
- 'password': password,
- 'tls_opts': tls_opts,
- 'name': tag
- }
- except Exception:
- return None
- def parse_tuic(uri):
- # tuic://uuid:password@host:port?params#remark
- try:
- url = urlparse(uri)
- server = url.hostname
- port = url.port
- user_info = url.username
- tag = unquote(url.fragment) if url.fragment else f"tuic_{server}"
- if not port or not user_info or not server:
- return None
- # Parse uuid:password
- if ':' in user_info:
- uuid, password = user_info.split(':', 1)
- else:
- uuid = user_info
- password = ''
- params = parse_qs(url.query)
- sni = params.get('sni', [None])[0]
- # TLS options for TUIC
- tls_opts = None
- if sni:
- tls_opts = {'server_name': sni}
- return {
- 'type': 'tuic',
- 'server': server,
- 'port': int(port),
- 'uuid': uuid,
- 'password': password,
- 'tls_opts': tls_opts,
- 'name': tag
- }
- except Exception:
- return None
- def parse_wireguard(uri):
- # wg://... (basic support)
- try:
- # This is a simplified parser for WireGuard
- # Real WireGuard configs are usually much more complex
- url = urlparse(uri)
- server = url.hostname
- port = url.port or 51820
- tag = unquote(url.fragment) if url.fragment else f"wireguard_{server}"
- if not server:
- return None
- params = parse_qs(url.query)
- private_key = params.get('privatekey', [None])[0]
- public_key = params.get('publickey', [None])[0]
- if not private_key or not public_key:
- return None
- return {
- 'type': 'wireguard',
- 'server': server,
- 'port': int(port),
- 'private_key': private_key,
- 'public_key': public_key,
- 'name': tag
- }
- except Exception:
- return None
- def parse_proxy_line(line):
- if line.startswith('vmess://'):
- return parse_vmess(line)
- elif line.startswith('vless://'):
- return parse_vless(line)
- elif line.startswith('trojan://'):
- return parse_trojan(line)
- elif line.startswith('ss://'):
- return parse_ss(line)
- elif line.startswith('socks://'):
- return parse_socks(line)
- elif line.startswith(('hy2://', 'hysteria2://')):
- return parse_hysteria2(line)
- elif line.startswith('tuic://'):
- return parse_tuic(line)
- elif line.startswith('wg://'):
- return parse_wireguard(line)
- elif line.startswith(('reality://', 'anytls://')):
- print(f'[!] WARNING: New or future protocol detected in link: {line[:32]}...')
- return None # Not yet implemented -- print warning
- else:
- if line.strip():
- print(f'[!] WARNING: Unknown v2ray/vless/protocol line skipped: {line[:48]}...')
- return None
- def validate_proxy(p):
- # Block injection if domain is well-known public web service or parameters are missing
- if not p:
- return False
- if not p.get('server') or not p.get('port') or not p.get('uuid', ''):
- return False
- # Block known public domains (e.g. speedtest.net, npmjs.com, google.com, etc.)
- public_domains = {
- 'www.speedtest.net', 'speedtest.net', 'npmjs.com', 'google.com', 'github.com', 'cloudflare.com',
- 'facebook.com', 'twitter.com', 'spotify.com', 'youtube.com', 'apple.com', 'microsoft.com', 'instagram.com'
- }
- server_l = p['server'].lower()
- if any(domain in server_l for domain in public_domains):
- return False
- # Optionally block .ir endpoints (for Iran direct/dns leaks)
- if server_l.endswith('.ir'):
- return False
- # You may add extra filters here if needed
- return True
- # --- CONFIG PARSING/RENDER ---
- def read_yaml_file(yaml_path):
- with open(yaml_path, 'r', encoding='utf-8') as f:
- yaml_ = YAML()
- content = yaml_.load(f)
- return content
- def write_yaml_file(yaml_obj, yaml_path):
- with open(yaml_path, 'w', encoding='utf-8') as f:
- yaml_ = YAML()
- yaml_.default_flow_style = False
- yaml_.dump(yaml_obj, f)
- def read_json_file(path):
- with open(path, 'r', encoding='utf-8') as f:
- return json.load(f)
- def write_json_file(obj, path):
- with open(path, 'w', encoding='utf-8') as f:
- json.dump(obj, f, indent=2, ensure_ascii=False)
- # ---- INJECTION LOGIC ----
- def ascii_name(name):
- # Remove all characters except ASCII letters, numbers, dash, underscore, and spaces
- n = re.sub(r'[^A-Za-z0-9 \-_]', '', name)
- return n.strip()
- def update_clash_proxies(clash_cfg, proxies):
- yaml_proxies = []
- name_registry = {}
- for p in proxies:
- clash_proxy = proxy_to_clash(p)
- if clash_proxy:
- orig_name = ascii_name(clash_proxy['name'])
- # Ensure unique names for Clash
- name = orig_name
- i = 2
- while name in name_registry:
- name = f"{orig_name} #{i}"
- i += 1
- clash_proxy['name'] = name
- name_registry[name] = True
- yaml_proxies.append(clash_proxy)
- proxy_names = [p['name'] for p in yaml_proxies]
- # ... (rest unchanged)
- for p in yaml_proxies:
- p['name'] = p['name']
- # ... (rest unchanged)
- clash_cfg['proxies'] = yaml_proxies
- clash_cfg['proxy-groups'] = []
- auto_group = {
- "name": "AUTO",
- "type": "url-test",
- "url": "https://www.gstatic.com/generate_204",
- "interval": 300,
- "tolerance": 50,
- "proxies": proxy_names
- }
- proxy_group = {
- "name": "PROXY",
- "type": "select",
- "proxies": ["AUTO"] + proxy_names
- }
- clash_cfg['proxy-groups'] = [auto_group, proxy_group]
- # Update rules to ensure consistent formatting (fix the spacing issues)
- if 'rules' in clash_cfg:
- new_rules = []
- for rule in clash_cfg['rules']:
- # Ensure proper formatting without extra spaces
- if isinstance(rule, str):
- # Split and rejoin to fix spacing
- parts = [part.strip() for part in rule.split(',')]
- new_rules.append(','.join(parts))
- else:
- new_rules.append(rule)
- clash_cfg['rules'] = new_rules
- return clash_cfg
- def proxy_to_clash(proxy):
- # Map internal proxy to Clash Meta format
- if proxy['type'] == 'vmess':
- clash_proxy = {
- 'name': proxy['name'],
- 'type': 'vmess',
- 'server': proxy['server'],
- 'port': proxy['port'],
- 'uuid': proxy['uuid'],
- 'alterId': int(proxy.get('alterId', '0')),
- 'cipher': proxy.get('cipher', 'auto'),
- 'network': proxy.get('network', 'tcp'),
- }
- if proxy.get('tls'):
- clash_proxy['tls'] = True
- if proxy.get('tls_opts', {}).get('server_name'):
- clash_proxy['servername'] = proxy['tls_opts']['server_name']
- # WebSocket options
- if proxy.get('transport'):
- clash_proxy['ws-opts'] = {
- 'path': proxy['transport'].get('path', '/'),
- 'headers': {}
- }
- if proxy['transport'].get('host'):
- clash_proxy['ws-opts']['headers']['Host'] = proxy['transport']['host']
- return clash_proxy
- elif proxy['type'] == 'vless':
- clash_proxy = {
- 'name': proxy['name'],
- 'type': 'vless',
- 'server': proxy['server'],
- 'port': proxy['port'],
- 'uuid': proxy['uuid'],
- 'network': proxy.get('network', 'tcp'),
- }
- if proxy.get('flow'):
- clash_proxy['flow'] = proxy['flow']
- if proxy.get('encryption'):
- clash_proxy['encryption'] = proxy['encryption']
- if proxy.get('tls'):
- clash_proxy['tls'] = True
- if proxy.get('tls_opts', {}).get('server_name'):
- clash_proxy['servername'] = proxy['tls_opts']['server_name']
- # Reality support
- if proxy.get('tls_opts', {}).get('reality'):
- reality = proxy['tls_opts']['reality']
- clash_proxy['reality-opts'] = {
- 'public-key': reality['public_key'],
- 'short-id': reality.get('short_id', '')
- }
- if reality.get('fingerprint'):
- clash_proxy['client-fingerprint'] = reality['fingerprint']
- # WebSocket options
- if proxy.get('transport'):
- clash_proxy['ws-opts'] = {
- 'path': proxy['transport'].get('path', '/'),
- 'headers': {}
- }
- if proxy['transport'].get('host'):
- clash_proxy['ws-opts']['headers']['Host'] = proxy['transport']['host']
- return clash_proxy
- elif proxy['type'] == 'trojan':
- clash_proxy = {
- 'name': proxy['name'],
- 'type': 'trojan',
- 'server': proxy['server'],
- 'port': proxy['port'],
- 'password': proxy['password'],
- }
- if proxy.get('tls_opts', {}).get('server_name'):
- clash_proxy['servername'] = proxy['tls_opts']['server_name']
- return clash_proxy
- elif proxy['type'] == 'ss':
- return {
- 'name': proxy['name'],
- 'type': 'ss',
- 'server': proxy['server'],
- 'port': proxy['port'],
- 'cipher': proxy['method'],
- 'password': proxy['password']
- }
- elif proxy['type'] == 'socks':
- clash_proxy = {
- 'name': proxy['name'],
- 'type': 'socks5',
- 'server': proxy['server'],
- 'port': proxy['port'],
- }
- if proxy['username']:
- clash_proxy['username'] = proxy['username']
- if proxy['password']:
- clash_proxy['password'] = proxy['password']
- return clash_proxy
- elif proxy['type'] == 'hysteria2':
- clash_proxy = {
- 'name': proxy['name'],
- 'type': 'hysteria2',
- 'server': proxy['server'],
- 'port': proxy['port'],
- 'password': proxy['password'],
- }
- if proxy.get('tls_opts', {}).get('server_name'):
- clash_proxy['sni'] = proxy['tls_opts']['server_name']
- return clash_proxy
- else:
- # Unsupported protocol for Clash
- return None
- def update_singbox_outbounds(sj, proxies):
- new_outbounds = []
- tagset = set()
- for p in proxies:
- sbo = proxy_to_singbox(p)
- if not sbo:
- continue
- if sbo['tag'] in tagset:
- continue
- new_outbounds.append(sbo)
- tagset.add(sbo['tag'])
- # Find existing system outbounds to preserve
- system_outbounds = []
- for o in sj.get('outbounds', []):
- if o.get('type') in ['direct', 'block']:
- system_outbounds.append(o)
- elif o.get('type') in ['selector', 'urltest']:
- # Keep the system selector/urltest outbounds but update their proxy lists
- system_outbounds.append(o)
- # Replace all outbounds but keep system ones and add new ones
- sj['outbounds'] = system_outbounds + new_outbounds
- # Update selector and urltest outbounds with new proxy tags
- proxy_tags = list(tagset)
- for o in sj['outbounds']:
- if o['type'] == 'selector' and o.get('tag') == 'proxy':
- # Replace any placeholder with auto + all proxies
- o['outbounds'] = ['auto'] + proxy_tags
- o['default'] = 'auto'
- elif o['type'] == 'urltest' and o.get('tag') == 'auto':
- # Replace any placeholder with all proxies
- o['outbounds'] = proxy_tags
- return sj
- def proxy_to_singbox(proxy):
- tag = ascii_name(proxy['name'])
- net = proxy.get('network', 'tcp')
- # If grpc network, change to tcp and add transport block if possible
- if net == 'grpc':
- net = 'tcp'
- transport = {'type': 'grpc'}
- if 'serviceName' in proxy:
- transport['serviceName'] = proxy['serviceName']
- proxy['transport'] = transport
- if net not in ('tcp', 'ws', 'wss', 'grpc'):
- print(f"[!] Skipping proxy with unsupported network type for sing-box: {net} ({proxy['name']})")
- return None
- # Now do ws path validation on path fields
- if net in ('ws', 'wss') and proxy.get('transport') and proxy['transport'].get('path'):
- path = str(proxy['transport'].get('path'))
- if not is_valid_ws_path(path):
- print(f"[FATAL] Skipping proxy with invalid WebSocket path: {path} ({proxy['name']})")
- return None
- if proxy['type'] == 'vmess':
- ws = net in ('ws', 'wss')
- out = OrderedDict([
- ('type', 'vmess'),
- ('tag', tag),
- ('server', proxy['server']),
- ('server_port', proxy['port']),
- ('uuid', proxy['uuid']),
- ('alter_id', int(proxy.get('alterId', '0'))),
- ('network', 'tcp' if ws else net)
- ])
- # TLS configuration
- if proxy.get('tls'):
- tls_config = {}
- if proxy.get('tls_opts', {}).get('server_name'):
- tls_config['server_name'] = proxy['tls_opts']['server_name']
- out['tls'] = tls_config
- # Transport configuration
- if ws and proxy.get('transport'):
- transport = {}
- if proxy['transport'].get('type'):
- transport['type'] = proxy['transport']['type']
- if proxy['transport'].get('path'):
- transport['path'] = proxy['transport']['path']
- if transport:
- out['transport'] = transport
- return out
- elif proxy['type'] == 'vless':
- ws = net in ('ws', 'wss')
- out = OrderedDict([
- ('type', 'vless'),
- ('tag', tag),
- ('server', proxy['server']),
- ('server_port', proxy['port']),
- ('uuid', proxy['uuid']),
- ('network', 'tcp' if ws else net)
- ])
- if proxy.get('flow'):
- out['flow'] = proxy['flow']
- if proxy.get('encryption'):
- out['encryption'] = proxy['encryption']
- # TLS configuration
- if proxy.get('tls'):
- tls_config = {}
- if proxy.get('tls_opts', {}).get('server_name'):
- tls_config['server_name'] = proxy['tls_opts']['server_name']
- if proxy.get('tls_opts', {}).get('reality'):
- reality = proxy['tls_opts']['reality']
- tls_config['reality'] = {
- 'enabled': True,
- 'public_key': reality['public_key'],
- 'short_id': reality.get('short_id', '')
- }
- out['tls'] = tls_config
- # Transport configuration
- if ws and proxy.get('transport'):
- transport = {}
- if proxy['transport'].get('type'):
- transport['type'] = proxy['transport']['type']
- if proxy['transport'].get('path'):
- transport['path'] = proxy['transport']['path']
- if transport:
- out['transport'] = transport
- return out
- elif proxy['type'] == 'trojan':
- out = {
- 'type': 'trojan',
- 'tag': tag,
- 'server': proxy['server'],
- 'server_port': proxy['port'],
- 'password': proxy['password'],
- }
- # TLS configuration
- if proxy.get('tls_opts', {}).get('server_name'):
- out['tls'] = {
- 'server_name': proxy['tls_opts']['server_name']
- }
- return out
- elif proxy['type'] == 'ss':
- return {
- 'type': 'shadowsocks',
- 'tag': tag,
- 'server': proxy['server'],
- 'server_port': proxy['port'],
- 'method': proxy['method'],
- 'password': proxy['password'],
- }
- elif proxy['type'] == 'socks':
- out = {
- 'type': 'socks',
- 'tag': tag,
- 'server': proxy['server'],
- 'server_port': proxy['port'],
- 'version': '5',
- }
- if proxy['username']:
- out['username'] = proxy['username']
- if proxy['password']:
- out['password'] = proxy['password']
- return out
- elif proxy['type'] == 'hysteria2':
- out = {
- 'type': 'hysteria2',
- 'tag': tag,
- 'server': proxy['server'],
- 'server_port': proxy['port'],
- 'password': proxy['password'],
- }
- # TLS configuration
- if proxy.get('tls_opts', {}).get('server_name'):
- out['tls'] = {
- 'server_name': proxy['tls_opts']['server_name']
- }
- return out
- elif proxy['type'] == 'tuic':
- out = {
- 'type': 'tuic',
- 'tag': tag,
- 'server': proxy['server'],
- 'server_port': proxy['port'],
- 'uuid': proxy['uuid'],
- 'password': proxy['password'],
- }
- # TLS configuration
- if proxy.get('tls_opts', {}).get('server_name'):
- out['tls'] = {
- 'server_name': proxy['tls_opts']['server_name']
- }
- return out
- else:
- # Unsupported protocol for sing-box or skip
- return None
- # ------ MAIN ENTRYPOINT ------
- if __name__ == '__main__':
- if len(sys.argv) != 6:
- print(
- "Usage: python sub2clash_singbox.py <sub_url> <clash_template.yaml> <singbox_template.json> <output_clash.yaml> <output_singbox.json>")
- sys.exit(1)
- (url, clash_tmpl, singbox_tmpl, out_clash, out_sb) = sys.argv[1:]
- print(f"[+] Download: {url}")
- lines = download_subscription(url)
- print(f"[+] {len(lines)} lines found in sub...")
- proxies = []
- for line in lines:
- px = parse_proxy_line(line)
- if validate_proxy(px):
- proxies.append(px)
- # NEW: Warn and halt if no valid proxies!
- if not proxies:
- print('[FATAL] No valid proxies remain after filtering subscription. Check your sub or filtering policy!')
- sys.exit(2)
- print(f"[+] Parsed proxies: {len(proxies)}")
- # Show protocol distribution
- protocol_count = {}
- for proxy in proxies:
- protocol = proxy['type']
- protocol_count[protocol] = protocol_count.get(protocol, 0) + 1
- print(f"[+] Protocol distribution: {protocol_count}")
- # --- Handle Clash Meta YAML ---
- print(f"[~] Processing Clash...")
- clash_cfg = read_yaml_file(clash_tmpl)
- clash_cfg = update_clash_proxies(clash_cfg, proxies)
- write_yaml_file(clash_cfg, out_clash)
- print(f"[✓] Output Clash config: {out_clash}")
- # --- Handle Singbox JSON ---
- print(f"[~] Processing Singbox...")
- singbox_cfg = read_json_file(singbox_tmpl)
- singbox_cfg = update_singbox_outbounds(singbox_cfg, proxies)
- write_json_file(singbox_cfg, out_sb)
- print(f"[✓] Output Singbox config: {out_sb}")
- print("Done!")
|