# -*- coding: utf-8 -*- # ============================================================================== # SCRIPT DE PROXY MULTIFILAMENTADO CON LÍMITE DE CONEXIONES Y RATE-LIMITING # # - Este script añade un límite al número máximo de conexiones activas que # el servidor puede manejar en un momento dado. # - Es la solución más efectiva para prevenir la saturación de la CPU en # entornos donde la arquitectura asíncrona no es viable. # - La limitación de velocidad se mantiene para proteger contra ataques de # denegación de servicio. # # Creado por Gemini # ============================================================================== import socket import threading import select import sys import time import os import logging import logging.handlers # ============================================================================== # CONFIGURACIÓN GLOBAL Y SETUP DE LOGGING # ============================================================================== # Direcciones de escucha para ambos protocolos. IPV4_ADDR = '0.0.0.0' IPV6_ADDR = '::' # Puerto de escucha. Se puede pasar como argumento. if sys.argv[1:]: LISTENING_PORT = int(sys.argv[1]) else: LISTENING_PORT = 80 # Contraseña opcional para el proxy. PASS = '' # Controla la prioridad de conexión PRIORITIZE_IPV4 = True # 💡 CONFIGURACIÓN DE SEGURIDAD # Tiempo mínimo de espera (en segundos) entre dos conexiones de la misma IP. CONNECTION_COOLDOWN_TIME = 100 # 💡 NUEVA CONFIGURACIÓN: Límite de conexiones activas MAX_CONNECTIONS = 200 # Máximo de conexiones simultáneas # Constantes BUFLEN = 4096 * 4 TIMEOUT = 60 DEFAULT_HOST = '127.0.0.1:22' RESPONSE = b'HTTP/1.1 101 Switching Protocols nftables 1.1.6\r\n\r\n' # Configuración del log LOG_FILE = '/root/proxy.log' MAX_LOG_SIZE = 5 * 1024 * 1024 # 5 MB BACKUP_COUNT = 5 # Cantidad de archivos de log a rotar def setup_logging(): """Configura el logger profesional con rotación de archivos.""" log_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler = logging.handlers.RotatingFileHandler( LOG_FILE, maxBytes=MAX_LOG_SIZE, backupCount=BACKUP_COUNT ) file_handler.setFormatter(log_format) console_handler = logging.StreamHandler() console_handler.setFormatter(log_format) logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger # Inicializar el logger logger = setup_logging() # 💡 Variables compartidas para la limitación de velocidad y el semáforo last_connection_times = {} last_connection_lock = threading.Lock() connection_limit_semaphore = threading.Semaphore(MAX_CONNECTIONS) # ============================================================================== # CLASE DEL SERVIDOR # Gestiona la creación de sockets y la aceptación de conexiones # ============================================================================== class Server(threading.Thread): def __init__(self, port): super().__init__() self.running = False self.port = port self.threads = [] self.threads_lock = threading.Lock() self.ipv4_socket = None self.ipv6_socket = None def run(self): logger.info("\n:-------PythonProxy-------:\n") logger.info(f"Listening addr: {IPV4_ADDR} and {IPV6_ADDR}") logger.info(f"Listening port: {self.port}\n") logger.info(f"Límite de conexiones: {MAX_CONNECTIONS}\n") logger.info(":-------------------------:\n") # Intentar enlazar a IPv4 try: self.ipv4_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ipv4_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.ipv4_socket.bind((IPV4_ADDR, self.port)) self.ipv4_socket.listen(0) logger.info(f"Esperando conexiones IPv4 en {IPV4_ADDR}:{self.port}") except socket.error as e: logger.error(f"No se pudo enlazar a IPv4 ({e})") if self.ipv4_socket: self.ipv4_socket.close() self.ipv4_socket = None # Intentar enlazar a IPv6 try: self.ipv6_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.ipv6_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.ipv6_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) self.ipv6_socket.bind((IPV6_ADDR, self.port, 0, 0)) self.ipv6_socket.listen(0) logger.info(f"Esperando conexiones IPv6 en {IPV6_ADDR}:{self.port}") except socket.error as e: logger.error(f"No se pudo enlazar a IPv6 ({e})") if self.ipv6_socket: self.ipv6_socket.close() self.ipv6_socket = None if not self.ipv4_socket and not self.ipv6_socket: logger.critical("No se pudo iniciar el servidor. Saliendo.") return self.running = True active_sockets = [] if self.ipv4_socket: active_sockets.append(self.ipv4_socket) if self.ipv6_socket: active_sockets.append(self.ipv6_socket) try: while self.running: readable, _, _ = select.select(active_sockets, [], [], 2) for sock in readable: c, addr = sock.accept() client_ip = addr[0] current_time = time.time() # 💡 Lógica del cooldown (rate-limiting) with last_connection_lock: last_time = last_connection_times.get(client_ip, 0) if current_time - last_time < CONNECTION_COOLDOWN_TIME: logger.warning(f"[{client_ip}] Conexión rechazada por rate-limiting.") c.close() continue last_connection_times[client_ip] = current_time # 💡 Intenta adquirir un "slot" del semáforo if not connection_limit_semaphore.acquire(timeout=0): logger.warning(f"[{client_ip}] Conexión rechazada. Límite de conexiones alcanzado.") c.close() continue c.setblocking(1) conn = ConnectionHandler(c, self, addr) conn.start() self.add_conn(conn) except Exception as e: logger.error(f"Error en el bucle principal del servidor: {e}") finally: self.running = False if self.ipv4_socket: self.ipv4_socket.close() if self.ipv6_socket: self.ipv6_socket.close() def add_conn(self, conn): """Añade un hilo de conexión a la lista de hilos activos de forma segura.""" with self.threads_lock: if self.running: self.threads.append(conn) def remove_conn(self, conn): """Remueve un hilo de conexión de la lista de hilos activos de forma segura.""" with self.threads_lock: if conn in self.threads: self.threads.remove(conn) def close(self): """Cierra el servidor y todos los hilos de conexión.""" self.running = False with self.threads_lock: threads = list(self.threads) for c in threads: c.close() # ============================================================================== # CLASE MANEJADORA DE CONEXIONES # Gestiona la lógica de cada conexión de cliente en un hilo separado # ============================================================================== class ConnectionHandler(threading.Thread): def __init__(self, client_socket, server, addr): super().__init__() self.client_closed = False self.target_closed = True self.client = client_socket self.client_buffer = b'' self.server = server self.addr = addr self.log_prefix = f"{addr[0]}:{addr[1]}" logger.info(f"Nueva conexión de {self.log_prefix}") self.target = None def close(self): """Cierra los sockets del cliente y el destino.""" logger.info(f"Cerrando conexión {self.log_prefix}") try: if not self.client_closed: self.client.shutdown(socket.SHUT_RDWR) self.client.close() except Exception: pass finally: self.client_closed = True try: if not self.target_closed: self.target.shutdown(socket.SHUT_RDWR) self.target.close() except Exception: pass finally: self.target_closed = True # 💡 Libera el semáforo al cerrar la conexión connection_limit_semaphore.release() def run(self): try: self.client_buffer = self.client.recv(BUFLEN) if not self.client_buffer: return headers = self.client_buffer.decode('latin-1') logger.debug(f"Headers received from {self.log_prefix}:\n{headers.strip()}") host_port = self.find_header(headers, 'X-Real-Host') if not host_port: host_port = DEFAULT_HOST if self.find_header(headers, 'X-Split'): self.client.recv(BUFLEN) if host_port: passwd = self.find_header(headers, 'X-Pass') if PASS and passwd == PASS: logger.info(f"Autenticación exitosa para {self.log_prefix} -> {host_port}") self.method_connect(host_port) elif PASS and passwd != PASS: logger.warning(f"Fallo de autenticación para {self.log_prefix} -> {host_port}") self.client.send(b'HTTP/1.1 400 WrongPass!\r\n\r\n') elif host_port.startswith('127.0.0.1') or host_port.startswith('localhost'): logger.info(f"Conexión local permitida para {self.log_prefix} -> {host_port}") self.method_connect(host_port) else: logger.warning(f"Acceso denegado (sin contraseña) para {self.log_prefix} -> {host_port}") self.client.send(b'HTTP/1.1 403 Forbidden!\r\n\r\n') else: logger.error(f"Encabezado 'X-Real-Host' no encontrado en la conexión de {self.log_prefix}") self.client.send(b'HTTP/1.1 400 NoXRealHost!\r\n\r\n') except Exception as e: logger.error(f"Error inesperado en el hilo de conexión {self.log_prefix}: {e}") finally: self.close() self.server.remove_conn(self) def find_header(self, head, header): """Busca un encabezado en la solicitud HTTP.""" try: aux = head.find(header + ': ') if aux == -1: return '' head = head[aux + len(header) + 2:] aux = head.find('\r\n') if aux == -1: return '' return head[:aux] except Exception as e: logger.error(f"Error al analizar el encabezado '{header}': {e}") return '' def connect_target(self, host_port): """Intenta conectarse al host de destino con una lógica de reconexión.""" i = host_port.find(':') if i != -1: try: port = int(host_port[i+1:]) host = host_port[:i] except ValueError: raise RuntimeError(f"Puerto inválido: {host_port[i+1:]}") else: host = host_port port = 22 # Puerto por defecto try: addr_info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM) except socket.gaierror as e: raise RuntimeError(f"Error de resolución de DNS para {host}: {e}") if PRIORITIZE_IPV4: prioritized_addrs = sorted(addr_info, key=lambda x: x[0] != socket.AF_INET) else: prioritized_addrs = addr_info logger.info(f"Intentando conectar a {host}:{port}. Direcciones disponibles: {[res[4] for res in prioritized_addrs]}") for res in prioritized_addrs: soc_family, soc_type, proto, _, address = res address_str = address[0] try: self.target = socket.socket(soc_family, soc_type, proto) self.target.connect(address) self.target_closed = False logger.info(f"Conexión exitosa a {address_str} (Familia: {soc_family})") return except socket.error as e: logger.warning(f"Error al conectar a {address_str}: {e}. Intentando la siguiente dirección...") if self.target: self.target.close() self.target = None if self.target is None: raise RuntimeError(f"No se pudo establecer una conexión con el host de destino: {host}") def method_connect(self, path): """Maneja el túnel de datos una vez que la conexión se ha establecido.""" try: self.connect_target(path) self.client.sendall(RESPONSE) self.do_connect() except RuntimeError as e: logger.error(f"Error en la conexión del método CONNECT para {self.log_prefix}: {e}") self.client.send(b'HTTP/1.1 502 Bad Gateway\r\n\r\n') except Exception as e: logger.error(f"Error inesperado en method_connect para {self.log_prefix}: {e}") self.client.send(b'HTTP/1.1 500 Internal Server Error\r\n\r\n') finally: self.client_buffer = b'' def do_connect(self): """Bucle principal para el túnel de datos.""" socs = [self.client, self.target] count = 0 while True: try: readable, _, err = select.select(socs, [], socs, 3) if err: break if readable: for sock in readable: data = sock.recv(BUFLEN) if data: if sock is self.target: self.client.send(data) else: self.target.sendall(data) count = 0 else: logger.info(f"Conexión con {self.log_prefix} terminada.") return if count == TIMEOUT: logger.info(f"Conexión con {self.log_prefix} excedió el tiempo de espera.") return count += 1 except (socket.error, socket.timeout) as e: logger.error(f"Error en el túnel de datos para {self.log_prefix}: {e}") return except Exception as e: logger.error(f"Error inesperado en do_connect para {self.log_prefix}: {e}") return # ============================================================================== # LÓGICA PRINCIPAL # ============================================================================== def main(): server = Server(LISTENING_PORT) server.start() try: while True: time.sleep(2) except KeyboardInterrupt: logger.info('Deteniendo el servidor...') server.close() server.join() sys.exit(0) if __name__ == '__main__': main()