from __future__ import annotations import hashlib import json import logging import os import re import shlex from pathlib import Path from typing import Any logger = logging.getLogger(__name__) class PermissionService: """Permission-first model - user is the authority.""" def __init__(self, config: dict[str, Any] | None = None, cache_file: Path | None = None): self._config = config or self._load_config() self._settings = self._config.get("settings", {}) self._cache_file = cache_file self._categories = self._config.get("command_categories", {}) self._path_settings = self._config.get("path_settings", {}) self._legacy_dangerous_commands = self._config.get("dangerous_commands", {}) self._legacy_sensitive_paths = self._config.get("sensitive_paths", []) def _load_config(self) -> dict[str, Any]: try: config_path = Path(__file__).parents[2] / "config" / "permissions.json" with open(config_path) as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load permissions config: {e}") return {"settings": {}, "command_categories": {}} def _get_cache_file(self) -> Path: if self._cache_file: return self._cache_file base_dir = Path(__file__).parents[2] cache_relative = self._settings.get("cache_file", "data/runtime/allowed_commands.json") return base_dir / cache_relative def _load_cache(self) -> dict[str, Any]: cache_file = self._get_cache_file() try: if cache_file.exists(): with open(cache_file) as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load cache: {e}") return {"allowed_once": {}, "allowed_always": {}} def _save_cache(self, cache: dict[str, Any]) -> None: cache_file = self._get_cache_file() cache_file.parent.mkdir(parents=True, exist_ok=True) with open(cache_file, "w") as f: json.dump(cache, f, indent=2) def check_shell_command( self, task_id: str, session_id: str, command: str, ) -> dict[str, Any]: """Check if shell command requires permission.""" normalized = self._normalize_command(command) command_hash = self._hash_command(normalized) cache = self._load_cache() # Check cache first if command_hash in cache.get("allowed_always", {}): return { "decision": "allowed_always", "command": normalized, "cached": True, } if command_hash in cache.get("allowed_once", {}): cached = cache["allowed_once"][command_hash] if cached.get("task_id") == task_id: return { "decision": "allowed_once", "command": normalized, "cached": True, } # Check hard stop if self._is_hard_stop(normalized): return { "decision": "hard_stop", "command": normalized, "reason": "Hard stop command - execution denied", } if not self._categories and self._legacy_dangerous_commands: if self._matches_legacy_dangerous(normalized): return { "decision": "prompt", "command": normalized, "category": "legacy_dangerous", "allow_always": False, "task_id": task_id, "session_id": session_id, } return { "decision": "allowed", "command": normalized, "category": "legacy_safe", "task_id": task_id, "session_id": session_id, } # Check no_always category category = self._get_category(normalized) can_always = self._categories.get(category, {}).get("allow_always", True) # Need user confirmation return { "decision": "prompt", "command": normalized, "category": category, "allow_always": can_always, "task_id": task_id, "session_id": session_id, } def check_write_path( self, task_id: str, session_id: str, path: str, ) -> dict[str, Any]: """Check if write path requires permission.""" if not self._path_settings and self._legacy_sensitive_paths: if any(path.startswith(sensitive) for sensitive in self._legacy_sensitive_paths): return { "decision": "prompt", "path": path, "task_id": task_id, "session_id": session_id, } return {"decision": "allowed", "path": path} allow_write_paths = self._path_settings.get("allow_write_paths", []) # Check if path is in allowed list for allowed in allow_write_paths: if path.startswith(allowed): return {"decision": "allowed", "path": path} # Otherwise require permission return { "decision": "prompt", "path": path, "task_id": task_id, "session_id": session_id, } def resolve_permission( self, task_id: str, session_id: str, command: str, decision: str, ) -> dict[str, Any]: """Resolve permission decision from user.""" normalized = self._normalize_command(command) command_hash = self._hash_command(normalized) cache = self._load_cache() if decision == "allow_once": cache.setdefault("allowed_once", {})[command_hash] = { "command": normalized, "task_id": task_id, "session_id": session_id, } self._save_cache(cache) return {"status": "allowed_once", "command": normalized} elif decision == "allow_always": cache.setdefault("allowed_always", {})[command_hash] = { "command": normalized, "task_id": task_id, "session_id": session_id, } self._save_cache(cache) return {"status": "allowed_always", "command": normalized} elif decision == "deny": return {"status": "denied", "command": normalized} return {"status": "unknown", "decision": decision} def clear_cache(self) -> dict[str, Any]: """Clear permission cache.""" cache = {"allowed_once": {}, "allowed_always": {}} self._save_cache(cache) return {"status": "cache_cleared"} def _normalize_command(self, command: str) -> str: """Normalize command for consistent hashing.""" if not self._settings.get("normalize_commands", True): return command.strip() normalized = command.strip() # Split chained commands if enabled if self._settings.get("split_chained", True): # Replace ; and || with && for splitting normalized = normalized.replace(";", " && ") normalized = normalized.replace("||", " && ") # Resolve environment variables try: normalized = os.path.expandvars(normalized) except: pass # Resolve home directory normalized = normalized.replace("~", os.path.expanduser("~")) # Remove extra whitespace normalized = " ".join(normalized.split()) return normalized def _hash_command(self, command: str) -> str: """Generate hash for command.""" return hashlib.sha256(command.encode()).hexdigest()[:16] def _matches_legacy_dangerous(self, command: str) -> bool: cmd_lower = command.lower() for pattern in self._legacy_dangerous_commands: if pattern.lower() in cmd_lower: return True return False def _is_hard_stop(self, command: str) -> bool: """Check if command is hard stop.""" hard_stop_commands = self._categories.get("hard_stop", {}).get("commands", []) cmd_lower = command.lower() for hs in hard_stop_commands: if hs.lower() in cmd_lower: return True return False def _get_category(self, command: str) -> str: """Get command category.""" cmd_lower = command.lower() # Check no_always category no_always = self._categories.get("no_always", {}).get("commands", []) for cmd in no_always: if cmd in cmd_lower: return "no_always" # Default to normal return "normal" SUDO_COMMANDS = { "apt", "apt-get", "dpkg", "yum", "dnf", "pacman", "zypper", "systemctl", "service", "mount", "umount", "shutdown", "reboot", "halt", "poweroff", "useradd", "usermod", "userdel", "groupadd", "groupmod", "chmod", "chown", "chgrp", "iptables", "ufw", "kill", "killall", "pkill", } def _requires_sudo(command: str) -> bool: """Check if command requires sudo.""" if not command: return False cmd_lower = command.lower().strip() first_word = cmd_lower.split()[0] if cmd_lower.split() else "" return first_word in SUDO_COMMANDS class PermissionRequest: """Permission request to user.""" def __init__( self, task_id: str, session_id: str, command: str, category: str = "normal", allow_always: bool = True, ) -> None: self.task_id = task_id self.session_id = session_id self.command = command self.category = category self.allow_always = allow_always self.requires_password = _requires_sudo(command) def to_dict(self) -> dict[str, Any]: return { "task_id": self.task_id, "session_id": self.session_id, "command": self.command, "category": self.category, "allow_always": self.allow_always, "requires_password": self.requires_password, "buttons": self._get_buttons(), } def _get_buttons(self) -> list[dict[str, str]]: buttons = [{"action": "deny", "label": "Запретить"}] if self.allow_always: buttons.insert(0, {"action": "allow_always", "label": "Разрешить навсегда"}) if self.requires_password: buttons.insert(0, {"action": "allow_with_password", "label": "Разрешить с паролем"}) else: buttons.insert(0, {"action": "allow_once", "label": "Разрешить"}) return buttons class PermissionDecision: """Permission decision.""" def __init__( self, decision: str, command: str | None = None, cached: bool = False, ) -> None: self.decision = decision self.command = command self.cached = cached