# -*- coding: utf-8 -*- # ============================================================================== # PROXY VPN v6 TLS - EDICIÓN PERSONALIZADA # Optimizado con Evasión de Escaneo Activo (400 OK) y Soporte Stunnel/SSH # ============================================================================== import socket import threading import select import sys import time import itertools import os import ssl # --- CONFIGURACIÓN BASE --- LISTENING_PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 443 SSH_HOST = '127.0.0.1' SSH_PORT = 22 # Puerto de SSH local (Dropbear u OpenSSH) LOG_FILE = "/root/proxy-ssl.log" MAX_LOG_SIZE = 10 * 1024 * 1024 # --- CONFIGURACIÓN SSL/TLS --- USE_SSL = True CERT_FILE = "/root/cert.pem" KEY_FILE = "/root/key.pem" # --- CONFIGURACIÓN DE SEGURIDAD AVANZADA --- MAX_CONNECTIONS = 200 CONNECTION_COOLDOWN = 0.5 BUFLEN = 16384 AUTO_BAN_STRIKES = 3 BAN_TIME = 3600 banned_ips_memory = {} ip_strikes = {} ip_strikes_lock = threading.Lock() # --- RESPUESTA FAKE WEB (ANTI ACTIVE PROBING) --- # Se utiliza "400 OK" para confundir a los escáneres de seguridad FAKE_WEB_RESPONSE = ( b"HTTP/1.1 400 OK\r\n" b"Server: nginx/1.21.0\r\n" b"Content-Type: text/html; charset=UTF-8\r\n" b"Connection: close\r\n\r\n" b"\n\nError\n" b"\n" b"

Hola

\n

400 Bad Request

\n" b"\n\n" ) # --- CUSTOM HEADERS PARA VPN --- CUSTOM_HEADERS = { "Server": "nginx/1.21.0", "X-Forwarded-For": "127.0.0.1", "Content-Type": "text/html; charset=UTF-8", "Proxy-Connection": "keep-alive", "Cache-Control": "no-cache", "X-Proxy-Agent": "Gemini-Ultra-Robust-v6-TLS", "X-Forwarded-For-Proxy": "True" } MENSAJES = [ "🚀 CONEXION TLS ESTABLECIDA", "🛡️ CIFRADO MILITAR ACTIVO", "🔋 MODO SIGILO SSL OK", "Pfsense", "OPNsense", "VyOS", "Claro", "Windows Server", "BSD Free", "Altice", "Viva", "Google", "TNSR", "🌐 BYPASS DE FIREWALL OK" ] mensaje_cycle = itertools.cycle(MENSAJES) cycle_lock = threading.Lock() def log(msg, addr=None): try: if os.path.exists(LOG_FILE) and os.path.getsize(LOG_FILE) > MAX_LOG_SIZE: with open(LOG_FILE, 'w') as f: f.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] LOG REINICIADO\n") timestamp = time.strftime("%Y-%m-%d %H:%M:%S") client_info = f" [{addr[0]}]" if addr else "" log_entry = f"[{timestamp}]{client_info} {msg}\n" with open(LOG_FILE, 'a') as f: f.write(log_entry) print(log_entry.strip()) except: pass active_connections = 0 conn_lock = threading.Lock() class ConnectionHandler(threading.Thread): def __init__(self, client_socket, addr): super().__init__(daemon=True) self.client = client_socket self.addr = addr self.target = None self.tx_bytes = 0 self.rx_bytes = 0 def build_http_response(self, status_msg): headers_str = "".join([f"{k}: {v}\r\n" for k, v in CUSTOM_HEADERS.items()]) return (f"HTTP/1.1 101 {status_msg}\r\n{headers_str}Connection: Upgrade\r\nUpgrade: websocket\r\n\r\n").encode('utf-8') def run(self): global active_connections client_ip = self.addr[0] try: # 1. Verificar Baneo Temporal if client_ip in banned_ips_memory: if time.time() < banned_ips_memory[client_ip]: return else: del banned_ips_memory[client_ip] # 2. Control de Inundación (Rate Limiting) now = time.time() with ip_strikes_lock: last_time = ip_strikes.get(f"{client_ip}_last", 0) if (now - last_time) < CONNECTION_COOLDOWN: strikes = ip_strikes.get(client_ip, 0) + 1 ip_strikes[client_ip] = strikes if strikes >= AUTO_BAN_STRIKES: banned_ips_memory[client_ip] = now + BAN_TIME log(f"⛔ IP Baneada (Flood/Spam)", self.addr) return ip_strikes[f"{client_ip}_last"] = now ip_strikes[client_ip] = 0 # 3. Lectura de Payload inicial self.client.settimeout(2.0) payload = b"" try: payload = self.client.recv(BUFLEN) except socket.timeout: pass # Modo Stunnel Silencioso except: return # 4. Conexión al destino SSH try: self.target = socket.create_connection((SSH_HOST, SSH_PORT), timeout=5) except Exception as e: log(f"❌ Error destino: {e}", self.addr) return # 5. Lógica de respuesta según el tráfico if payload: if payload.startswith(b"SSH-"): log(f"✅ Túnel (Modo SSH Directo)", self.addr) self.target.sendall(payload) elif b"HTTP/" in payload and b"Upgrade: websocket" not in payload: log(f"🕵️ Escáner detectado. Respondiendo 400 OK Fake Web.", self.addr) self.client.sendall(FAKE_WEB_RESPONSE) return else: with cycle_lock: current_status = next(mensaje_cycle) self.client.sendall(self.build_http_response(current_status)) log(f"✅ Túnel (Modo WebSocket HTTP): {current_status}", self.addr) else: log(f"✅ Túnel (Modo Stunnel Silencioso)", self.addr) self.tunnel() except Exception as e: pass finally: with conn_lock: active_connections -= 1 self.cleanup() def tunnel(self): self.client.settimeout(None) self.target.settimeout(None) sockets = [self.client, self.target] while True: try: readable, _, error = select.select(sockets, [], sockets, 300) if error or not readable: break for s in readable: data = s.recv(BUFLEN) if not data: return if s is self.client: self.target.sendall(data) self.tx_bytes += len(data) else: self.client.sendall(data) self.rx_bytes += len(data) except: break def cleanup(self): total_mb = (self.tx_bytes + self.rx_bytes) / (1024 * 1024) if total_mb > 0.05: log(f"[*] Cierre de sesión. Tráfico: {total_mb:.2f} MB", self.addr) for s in [self.client, self.target]: if s: try: s.close() except: pass def main(): global active_connections # Preparar Contexto SSL Permisivo context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) try: context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE) except Exception as e: print(f"Error certificados: {e}") sys.exit(1) # Iniciar Servidor Dual Stack try: addr_info = socket.getaddrinfo(None, LISTENING_PORT, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE) addr_info.sort(key=lambda x: x[0] == socket.AF_INET6, reverse=True) af, socktype, proto, canonname, sa = addr_info[0] server = socket.socket(af, socktype, proto) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if af == socket.AF_INET6: try: server.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) except: pass server.bind(sa) server.listen(600) log(f"=====================================================") log(f"🚀 Servidor v6 TLS Personalizado - Puerto {LISTENING_PORT}") log(f"🛡️ Backlog: 600 | Anti-Probing: 400 OK") log(f"=====================================================") while True: try: raw_c, addr = server.accept() try: client = context.wrap_socket(raw_c, server_side=True) with conn_lock: if active_connections >= MAX_CONNECTIONS: client.close() continue active_connections += 1 ConnectionHandler(client, addr).start() except: raw_c.close() except: time.sleep(0.05) except Exception as e: log(f"Error crítico: {e}") finally: server.close() if __name__ == "__main__": main()