| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- # -*- coding: utf-8 -*-
- # ==============================================================================
- # SCRIPT DE PROXY MULTIFILAMENTADO CON LÍMITE DE CONEXIONES Y RATE-LIMITING
- #
- # - Este script añade un límite al número máximo de conexiones activas que
- # el servidor puede manejar en un momento dado.
- # - Es la solución más efectiva para prevenir la saturación de la CPU en
- # entornos donde la arquitectura asíncrona no es viable.
- # - La limitación de velocidad se mantiene para proteger contra ataques de
- # denegación de servicio.
- #
- # Creado por Gemini
- # ==============================================================================
- import socket
- import threading
- import select
- import sys
- import time
- import os
- import logging
- import logging.handlers
- # ==============================================================================
- # CONFIGURACIÓN GLOBAL Y SETUP DE LOGGING
- # ==============================================================================
- # Direcciones de escucha para ambos protocolos.
- IPV4_ADDR = '0.0.0.0'
- IPV6_ADDR = '::'
- # Puerto de escucha. Se puede pasar como argumento.
- if sys.argv[1:]:
- LISTENING_PORT = int(sys.argv[1])
- else:
- LISTENING_PORT = 80
- # Contraseña opcional para el proxy.
- PASS = ''
- # Controla la prioridad de conexión
- PRIORITIZE_IPV4 = True
- # 💡 CONFIGURACIÓN DE SEGURIDAD
- # Tiempo mínimo de espera (en segundos) entre dos conexiones de la misma IP.
- CONNECTION_COOLDOWN_TIME = 100
- # 💡 NUEVA CONFIGURACIÓN: Límite de conexiones activas
- MAX_CONNECTIONS = 200 # Máximo de conexiones simultáneas
- # Constantes
- BUFLEN = 4096 * 4
- TIMEOUT = 60
- DEFAULT_HOST = '127.0.0.1:22'
- RESPONSE = b'HTTP/1.1 101 Switching Protocols <strong>nftables 1.1.6</strong>\r\n\r\n'
- # Configuración del log
- LOG_FILE = '/root/proxy.log'
- MAX_LOG_SIZE = 5 * 1024 * 1024 # 5 MB
- BACKUP_COUNT = 5 # Cantidad de archivos de log a rotar
- def setup_logging():
- """Configura el logger profesional con rotación de archivos."""
- log_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
-
- file_handler = logging.handlers.RotatingFileHandler(
- LOG_FILE,
- maxBytes=MAX_LOG_SIZE,
- backupCount=BACKUP_COUNT
- )
- file_handler.setFormatter(log_format)
-
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(log_format)
-
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- logger.addHandler(file_handler)
- logger.addHandler(console_handler)
-
- return logger
- # Inicializar el logger
- logger = setup_logging()
- # 💡 Variables compartidas para la limitación de velocidad y el semáforo
- last_connection_times = {}
- last_connection_lock = threading.Lock()
- connection_limit_semaphore = threading.Semaphore(MAX_CONNECTIONS)
- # ==============================================================================
- # CLASE DEL SERVIDOR
- # Gestiona la creación de sockets y la aceptación de conexiones
- # ==============================================================================
- class Server(threading.Thread):
- def __init__(self, port):
- super().__init__()
- self.running = False
- self.port = port
- self.threads = []
- self.threads_lock = threading.Lock()
- self.ipv4_socket = None
- self.ipv6_socket = None
- def run(self):
- logger.info("\n:-------PythonProxy-------:\n")
- logger.info(f"Listening addr: {IPV4_ADDR} and {IPV6_ADDR}")
- logger.info(f"Listening port: {self.port}\n")
- logger.info(f"Límite de conexiones: {MAX_CONNECTIONS}\n")
- logger.info(":-------------------------:\n")
- # Intentar enlazar a IPv4
- try:
- self.ipv4_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.ipv4_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.ipv4_socket.bind((IPV4_ADDR, self.port))
- self.ipv4_socket.listen(0)
- logger.info(f"Esperando conexiones IPv4 en {IPV4_ADDR}:{self.port}")
- except socket.error as e:
- logger.error(f"No se pudo enlazar a IPv4 ({e})")
- if self.ipv4_socket:
- self.ipv4_socket.close()
- self.ipv4_socket = None
- # Intentar enlazar a IPv6
- try:
- self.ipv6_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- self.ipv6_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.ipv6_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
- self.ipv6_socket.bind((IPV6_ADDR, self.port, 0, 0))
- self.ipv6_socket.listen(0)
- logger.info(f"Esperando conexiones IPv6 en {IPV6_ADDR}:{self.port}")
- except socket.error as e:
- logger.error(f"No se pudo enlazar a IPv6 ({e})")
- if self.ipv6_socket:
- self.ipv6_socket.close()
- self.ipv6_socket = None
- if not self.ipv4_socket and not self.ipv6_socket:
- logger.critical("No se pudo iniciar el servidor. Saliendo.")
- return
- self.running = True
- active_sockets = []
- if self.ipv4_socket:
- active_sockets.append(self.ipv4_socket)
- if self.ipv6_socket:
- active_sockets.append(self.ipv6_socket)
- try:
- while self.running:
- readable, _, _ = select.select(active_sockets, [], [], 2)
- for sock in readable:
- c, addr = sock.accept()
- client_ip = addr[0]
- current_time = time.time()
- # 💡 Lógica del cooldown (rate-limiting)
- with last_connection_lock:
- last_time = last_connection_times.get(client_ip, 0)
- if current_time - last_time < CONNECTION_COOLDOWN_TIME:
- logger.warning(f"[{client_ip}] Conexión rechazada por rate-limiting.")
- c.close()
- continue
- last_connection_times[client_ip] = current_time
- # 💡 Intenta adquirir un "slot" del semáforo
- if not connection_limit_semaphore.acquire(timeout=0):
- logger.warning(f"[{client_ip}] Conexión rechazada. Límite de conexiones alcanzado.")
- c.close()
- continue
- c.setblocking(1)
- conn = ConnectionHandler(c, self, addr)
- conn.start()
- self.add_conn(conn)
- except Exception as e:
- logger.error(f"Error en el bucle principal del servidor: {e}")
- finally:
- self.running = False
- if self.ipv4_socket:
- self.ipv4_socket.close()
- if self.ipv6_socket:
- self.ipv6_socket.close()
- def add_conn(self, conn):
- """Añade un hilo de conexión a la lista de hilos activos de forma segura."""
- with self.threads_lock:
- if self.running:
- self.threads.append(conn)
- def remove_conn(self, conn):
- """Remueve un hilo de conexión de la lista de hilos activos de forma segura."""
- with self.threads_lock:
- if conn in self.threads:
- self.threads.remove(conn)
- def close(self):
- """Cierra el servidor y todos los hilos de conexión."""
- self.running = False
- with self.threads_lock:
- threads = list(self.threads)
- for c in threads:
- c.close()
- # ==============================================================================
- # CLASE MANEJADORA DE CONEXIONES
- # Gestiona la lógica de cada conexión de cliente en un hilo separado
- # ==============================================================================
- class ConnectionHandler(threading.Thread):
- def __init__(self, client_socket, server, addr):
- super().__init__()
- self.client_closed = False
- self.target_closed = True
- self.client = client_socket
- self.client_buffer = b''
- self.server = server
- self.addr = addr
- self.log_prefix = f"{addr[0]}:{addr[1]}"
- logger.info(f"Nueva conexión de {self.log_prefix}")
- self.target = None
- def close(self):
- """Cierra los sockets del cliente y el destino."""
- logger.info(f"Cerrando conexión {self.log_prefix}")
-
- try:
- if not self.client_closed:
- self.client.shutdown(socket.SHUT_RDWR)
- self.client.close()
- except Exception:
- pass
- finally:
- self.client_closed = True
- try:
- if not self.target_closed:
- self.target.shutdown(socket.SHUT_RDWR)
- self.target.close()
- except Exception:
- pass
- finally:
- self.target_closed = True
-
- # 💡 Libera el semáforo al cerrar la conexión
- connection_limit_semaphore.release()
- def run(self):
- try:
- self.client_buffer = self.client.recv(BUFLEN)
- if not self.client_buffer:
- return
- headers = self.client_buffer.decode('latin-1')
- logger.debug(f"Headers received from {self.log_prefix}:\n{headers.strip()}")
- host_port = self.find_header(headers, 'X-Real-Host')
- if not host_port:
- host_port = DEFAULT_HOST
- if self.find_header(headers, 'X-Split'):
- self.client.recv(BUFLEN)
- if host_port:
- passwd = self.find_header(headers, 'X-Pass')
- if PASS and passwd == PASS:
- logger.info(f"Autenticación exitosa para {self.log_prefix} -> {host_port}")
- self.method_connect(host_port)
- elif PASS and passwd != PASS:
- logger.warning(f"Fallo de autenticación para {self.log_prefix} -> {host_port}")
- self.client.send(b'HTTP/1.1 400 WrongPass!\r\n\r\n')
- elif host_port.startswith('127.0.0.1') or host_port.startswith('localhost'):
- logger.info(f"Conexión local permitida para {self.log_prefix} -> {host_port}")
- self.method_connect(host_port)
- else:
- logger.warning(f"Acceso denegado (sin contraseña) para {self.log_prefix} -> {host_port}")
- self.client.send(b'HTTP/1.1 403 Forbidden!\r\n\r\n')
- else:
- logger.error(f"Encabezado 'X-Real-Host' no encontrado en la conexión de {self.log_prefix}")
- self.client.send(b'HTTP/1.1 400 NoXRealHost!\r\n\r\n')
- except Exception as e:
- logger.error(f"Error inesperado en el hilo de conexión {self.log_prefix}: {e}")
- finally:
- self.close()
- self.server.remove_conn(self)
- def find_header(self, head, header):
- """Busca un encabezado en la solicitud HTTP."""
- try:
- aux = head.find(header + ': ')
- if aux == -1: return ''
- head = head[aux + len(header) + 2:]
- aux = head.find('\r\n')
- if aux == -1: return ''
- return head[:aux]
- except Exception as e:
- logger.error(f"Error al analizar el encabezado '{header}': {e}")
- return ''
- def connect_target(self, host_port):
- """Intenta conectarse al host de destino con una lógica de reconexión."""
- i = host_port.find(':')
- if i != -1:
- try:
- port = int(host_port[i+1:])
- host = host_port[:i]
- except ValueError:
- raise RuntimeError(f"Puerto inválido: {host_port[i+1:]}")
- else:
- host = host_port
- port = 22 # Puerto por defecto
- try:
- addr_info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
- except socket.gaierror as e:
- raise RuntimeError(f"Error de resolución de DNS para {host}: {e}")
- if PRIORITIZE_IPV4:
- prioritized_addrs = sorted(addr_info, key=lambda x: x[0] != socket.AF_INET)
- else:
- prioritized_addrs = addr_info
- logger.info(f"Intentando conectar a {host}:{port}. Direcciones disponibles: {[res[4] for res in prioritized_addrs]}")
- for res in prioritized_addrs:
- soc_family, soc_type, proto, _, address = res
- address_str = address[0]
- try:
- self.target = socket.socket(soc_family, soc_type, proto)
- self.target.connect(address)
- self.target_closed = False
- logger.info(f"Conexión exitosa a {address_str} (Familia: {soc_family})")
- return
- except socket.error as e:
- logger.warning(f"Error al conectar a {address_str}: {e}. Intentando la siguiente dirección...")
- if self.target:
- self.target.close()
- self.target = None
-
- if self.target is None:
- raise RuntimeError(f"No se pudo establecer una conexión con el host de destino: {host}")
- def method_connect(self, path):
- """Maneja el túnel de datos una vez que la conexión se ha establecido."""
- try:
- self.connect_target(path)
- self.client.sendall(RESPONSE)
- self.do_connect()
- except RuntimeError as e:
- logger.error(f"Error en la conexión del método CONNECT para {self.log_prefix}: {e}")
- self.client.send(b'HTTP/1.1 502 Bad Gateway\r\n\r\n')
- except Exception as e:
- logger.error(f"Error inesperado en method_connect para {self.log_prefix}: {e}")
- self.client.send(b'HTTP/1.1 500 Internal Server Error\r\n\r\n')
- finally:
- self.client_buffer = b''
- def do_connect(self):
- """Bucle principal para el túnel de datos."""
- socs = [self.client, self.target]
- count = 0
- while True:
- try:
- readable, _, err = select.select(socs, [], socs, 3)
- if err:
- break
- if readable:
- for sock in readable:
- data = sock.recv(BUFLEN)
- if data:
- if sock is self.target:
- self.client.send(data)
- else:
- self.target.sendall(data)
- count = 0
- else:
- logger.info(f"Conexión con {self.log_prefix} terminada.")
- return
- if count == TIMEOUT:
- logger.info(f"Conexión con {self.log_prefix} excedió el tiempo de espera.")
- return
- count += 1
- except (socket.error, socket.timeout) as e:
- logger.error(f"Error en el túnel de datos para {self.log_prefix}: {e}")
- return
- except Exception as e:
- logger.error(f"Error inesperado en do_connect para {self.log_prefix}: {e}")
- return
- # ==============================================================================
- # LÓGICA PRINCIPAL
- # ==============================================================================
- def main():
- server = Server(LISTENING_PORT)
- server.start()
- try:
- while True:
- time.sleep(2)
- except KeyboardInterrupt:
- logger.info('Deteniendo el servidor...')
- server.close()
- server.join()
- sys.exit(0)
- if __name__ == '__main__':
- main()
-
|