diff --git a/windows_fixer.py b/windows_fixer.py index ff1e71a..6a5df01 100644 --- a/windows_fixer.py +++ b/windows_fixer.py @@ -19,7 +19,7 @@ APP_ID = "WindowsFixer" APP_VERSION = "v1.0.0" -BUILD_DATE = date.today().isoformat() +BUILD_DATE = os.environ.get("BUILD_DATE", date.today().isoformat()) DONATE_PAGE = "https://buymeacoffee.com/ilukezippo" GITHUB_PAGE = "https://github.com/ilukezippo/Windows_Fixer" @@ -31,6 +31,9 @@ def resource_path(relative_path: str) -> str: + # Reject path traversal attempts + if ".." in relative_path or relative_path.startswith(("/", "\\")): + raise ValueError(f"Invalid resource path: {relative_path}") # 1) Prefer files next to the exe (portable) try: if getattr(sys, "frozen", False): @@ -155,14 +158,24 @@ def _settings_path(): return os.path.join(folder, "settings.json") +def _sanitize_settings(data): + if not isinstance(data, dict): + return {"always_admin": False, "language": "en"} + lang = data.get("language", "en") + return { + "always_admin": bool(data.get("always_admin", False)), + "language": lang if lang in ("en", "ar") else "en", + } + + def load_settings(): path = _settings_path() try: if os.path.exists(path): with open(path, "r", encoding="utf-8") as f: - return json.load(f) - except Exception: - pass + return _sanitize_settings(json.load(f)) + except (json.JSONDecodeError, OSError, ValueError) as e: + print(f"[WARN] Failed to load settings: {e}") return {"always_admin": False, "language": "en"} @@ -171,19 +184,19 @@ def save_settings(data: dict): try: with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) - except Exception: - pass + except OSError as e: + print(f"[WARN] Failed to save settings: {e}") def is_admin() -> bool: try: return ctypes.windll.shell32.IsUserAnAdmin() != 0 - except Exception: + except (AttributeError, OSError): return False def relaunch_as_admin(): - params = " ".join([f'"{a}"' for a in sys.argv]) + params = subprocess.list2cmdline(sys.argv) ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, params, None, 0) sys.exit(0) @@ -200,15 +213,25 @@ def list_drives(): return drives or ["C:"] +def _rmtree_onerror(func, path, exc_info): + """Collect rmtree errors instead of silently ignoring them.""" + safe_rmtree._errors.append(path) + + def safe_rmtree(path: str, log_cb): try: if os.path.isdir(path) and not os.path.islink(path): - shutil.rmtree(path, ignore_errors=True) + safe_rmtree._errors = [] + shutil.rmtree(path, onerror=_rmtree_onerror) + if safe_rmtree._errors: + log_cb(f"[WARN] Could not delete {len(safe_rmtree._errors)} item(s) under {path}") else: try: os.remove(path) - except Exception: - pass + except PermissionError: + log_cb(f"[WARN] Permission denied: {path}") + except Exception as e: + log_cb(f"[WARN] Could not delete {path}: {e}") except Exception as e: log_cb(f"[WARN] Could not delete {path}: {e}") @@ -260,30 +283,30 @@ class CommandRunner: def __init__(self, log_cb): self.log_cb = log_cb self.current_proc = None - self._cancel_all = False - self._skip_step = False + self._cancel_all = threading.Event() + self._skip_step = threading.Event() def reset_all(self): - self._cancel_all = False - self._skip_step = False + self._cancel_all.clear() + self._skip_step.clear() self.current_proc = None def request_cancel_all(self): - self._cancel_all = True + self._cancel_all.set() self._terminate_current("Cancel requested") def request_skip_step(self): - self._skip_step = True + self._skip_step.set() self._terminate_current("Skip requested") def reset_flags_for_step(self): - self._skip_step = False + self._skip_step.clear() def cancel_all_requested(self) -> bool: - return self._cancel_all + return self._cancel_all.is_set() def skip_requested(self) -> bool: - return self._skip_step + return self._skip_step.is_set() def _terminate_current(self, reason: str): if self.current_proc: @@ -314,10 +337,10 @@ def run_cmd(self, cmd): try: if self.current_proc.stdout: for line in self.current_proc.stdout: - if self._cancel_all: + if self._cancel_all.is_set(): self._terminate_current("Cancel requested") break - if self._skip_step: + if self._skip_step.is_set(): self._terminate_current("Skip requested") break self.log_cb(line.rstrip("\n")) @@ -340,10 +363,10 @@ def run_cmd(self, cmd): self.current_proc = None - if self._cancel_all: + if self._cancel_all.is_set(): self.log_cb("=== STOPPED (cancel) ===\n") return "cancel" - if self._skip_step: + if self._skip_step.is_set(): self.log_cb("=== SKIPPED ===\n") return "skip" @@ -457,7 +480,8 @@ def worker(): with urllib.request.urlopen(req, timeout=10) as r: data = json.loads(r.read().decode("utf-8", "replace")) - tag = str(data.get("tag_name") or data.get("name") or "").strip() + raw_tag = str(data.get("tag_name") or data.get("name") or "").strip() + tag = re.sub(r"[^v0-9.]", "", raw_tag) if tag and self._parse_ver_tuple(tag) > self._parse_ver_tuple(APP_VERSION): def _ask(): @@ -1003,11 +1027,14 @@ def step_sfc(self): def step_chkdsk(self): drive = self.var_drive.get().strip().upper() + if not re.fullmatch(r"[A-Z]:", drive): + self.enqueue_log(f"[ERROR] Invalid drive letter: {drive}") + return "error" mode = self.var_chkdsk_mode.get() if mode == "scan": return self.run_command_step(["chkdsk", drive]) self.enqueue_log("[INFO] Fix mode may require restart (Windows may ask to schedule it).") - return self.run_command_step(["cmd", "/c", f"chkdsk {drive} /f"]) + return self.run_command_step(["chkdsk", drive, "/f"]) def step_reset_network(self): r = self.run_command_step(["netsh", "winsock", "reset"])