ducklm/app/core/permission_service.py

342 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import shlex
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
class PermissionService:
"""Permission-first model - user is the authority."""
def __init__(self, config: dict[str, Any] | None = None, cache_file: Path | None = None):
self._config = config or self._load_config()
self._settings = self._config.get("settings", {})
self._cache_file = cache_file
self._categories = self._config.get("command_categories", {})
self._path_settings = self._config.get("path_settings", {})
self._legacy_dangerous_commands = self._config.get("dangerous_commands", {})
self._legacy_sensitive_paths = self._config.get("sensitive_paths", [])
def _load_config(self) -> dict[str, Any]:
try:
config_path = Path(__file__).parents[2] / "config" / "permissions.json"
with open(config_path) as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load permissions config: {e}")
return {"settings": {}, "command_categories": {}}
def _get_cache_file(self) -> Path:
if self._cache_file:
return self._cache_file
base_dir = Path(__file__).parents[2]
cache_relative = self._settings.get("cache_file", "data/runtime/allowed_commands.json")
return base_dir / cache_relative
def _load_cache(self) -> dict[str, Any]:
cache_file = self._get_cache_file()
try:
if cache_file.exists():
with open(cache_file) as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load cache: {e}")
return {"allowed_once": {}, "allowed_always": {}}
def _save_cache(self, cache: dict[str, Any]) -> None:
cache_file = self._get_cache_file()
cache_file.parent.mkdir(parents=True, exist_ok=True)
with open(cache_file, "w") as f:
json.dump(cache, f, indent=2)
def check_shell_command(
self,
task_id: str,
session_id: str,
command: str,
) -> dict[str, Any]:
"""Check if shell command requires permission."""
normalized = self._normalize_command(command)
command_hash = self._hash_command(normalized)
cache = self._load_cache()
# Check cache first
if command_hash in cache.get("allowed_always", {}):
return {
"decision": "allowed_always",
"command": normalized,
"cached": True,
}
if command_hash in cache.get("allowed_once", {}):
cached = cache["allowed_once"][command_hash]
if cached.get("task_id") == task_id:
return {
"decision": "allowed_once",
"command": normalized,
"cached": True,
}
# Check hard stop
if self._is_hard_stop(normalized):
return {
"decision": "hard_stop",
"command": normalized,
"reason": "Hard stop command - execution denied",
}
if not self._categories and self._legacy_dangerous_commands:
if self._matches_legacy_dangerous(normalized):
return {
"decision": "prompt",
"command": normalized,
"category": "legacy_dangerous",
"allow_always": False,
"task_id": task_id,
"session_id": session_id,
}
return {
"decision": "allowed",
"command": normalized,
"category": "legacy_safe",
"task_id": task_id,
"session_id": session_id,
}
# Check no_always category
category = self._get_category(normalized)
can_always = self._categories.get(category, {}).get("allow_always", True)
# Need user confirmation
return {
"decision": "prompt",
"command": normalized,
"category": category,
"allow_always": can_always,
"task_id": task_id,
"session_id": session_id,
}
def check_write_path(
self,
task_id: str,
session_id: str,
path: str,
) -> dict[str, Any]:
"""Check if write path requires permission."""
if not self._path_settings and self._legacy_sensitive_paths:
if any(path.startswith(sensitive) for sensitive in self._legacy_sensitive_paths):
return {
"decision": "prompt",
"path": path,
"task_id": task_id,
"session_id": session_id,
}
return {"decision": "allowed", "path": path}
allow_write_paths = self._path_settings.get("allow_write_paths", [])
# Check if path is in allowed list
for allowed in allow_write_paths:
if path.startswith(allowed):
return {"decision": "allowed", "path": path}
# Otherwise require permission
return {
"decision": "prompt",
"path": path,
"task_id": task_id,
"session_id": session_id,
}
def resolve_permission(
self,
task_id: str,
session_id: str,
command: str,
decision: str,
) -> dict[str, Any]:
"""Resolve permission decision from user."""
normalized = self._normalize_command(command)
command_hash = self._hash_command(normalized)
cache = self._load_cache()
if decision == "allow_once":
cache.setdefault("allowed_once", {})[command_hash] = {
"command": normalized,
"task_id": task_id,
"session_id": session_id,
}
self._save_cache(cache)
return {"status": "allowed_once", "command": normalized}
elif decision == "allow_always":
cache.setdefault("allowed_always", {})[command_hash] = {
"command": normalized,
"task_id": task_id,
"session_id": session_id,
}
self._save_cache(cache)
return {"status": "allowed_always", "command": normalized}
elif decision == "deny":
return {"status": "denied", "command": normalized}
return {"status": "unknown", "decision": decision}
def clear_cache(self) -> dict[str, Any]:
"""Clear permission cache."""
cache = {"allowed_once": {}, "allowed_always": {}}
self._save_cache(cache)
return {"status": "cache_cleared"}
def _normalize_command(self, command: str) -> str:
"""Normalize command for consistent hashing."""
if not self._settings.get("normalize_commands", True):
return command.strip()
normalized = command.strip()
# Split chained commands if enabled
if self._settings.get("split_chained", True):
# Replace ; and || with && for splitting
normalized = normalized.replace(";", " && ")
normalized = normalized.replace("||", " && ")
# Resolve environment variables
try:
normalized = os.path.expandvars(normalized)
except:
pass
# Resolve home directory
normalized = normalized.replace("~", os.path.expanduser("~"))
# Remove extra whitespace
normalized = " ".join(normalized.split())
return normalized
def _hash_command(self, command: str) -> str:
"""Generate hash for command."""
return hashlib.sha256(command.encode()).hexdigest()[:16]
def _matches_legacy_dangerous(self, command: str) -> bool:
cmd_lower = command.lower()
for pattern in self._legacy_dangerous_commands:
if pattern.lower() in cmd_lower:
return True
return False
def _is_hard_stop(self, command: str) -> bool:
"""Check if command is hard stop."""
hard_stop_commands = self._categories.get("hard_stop", {}).get("commands", [])
cmd_lower = command.lower()
for hs in hard_stop_commands:
if hs.lower() in cmd_lower:
return True
return False
def _get_category(self, command: str) -> str:
"""Get command category."""
cmd_lower = command.lower()
# Check no_always category
no_always = self._categories.get("no_always", {}).get("commands", [])
for cmd in no_always:
if cmd in cmd_lower:
return "no_always"
# Default to normal
return "normal"
SUDO_COMMANDS = {
"apt", "apt-get", "dpkg", "yum", "dnf", "pacman", "zypper",
"systemctl", "service", "mount", "umount",
"shutdown", "reboot", "halt", "poweroff",
"useradd", "usermod", "userdel", "groupadd", "groupmod",
"chmod", "chown", "chgrp",
"iptables", "ufw",
"kill", "killall", "pkill",
}
def _requires_sudo(command: str) -> bool:
"""Check if command requires sudo."""
if not command:
return False
cmd_lower = command.lower().strip()
first_word = cmd_lower.split()[0] if cmd_lower.split() else ""
return first_word in SUDO_COMMANDS
class PermissionRequest:
"""Permission request to user."""
def __init__(
self,
task_id: str,
session_id: str,
command: str,
category: str = "normal",
allow_always: bool = True,
) -> None:
self.task_id = task_id
self.session_id = session_id
self.command = command
self.category = category
self.allow_always = allow_always
self.requires_password = _requires_sudo(command)
def to_dict(self) -> dict[str, Any]:
return {
"task_id": self.task_id,
"session_id": self.session_id,
"command": self.command,
"category": self.category,
"allow_always": self.allow_always,
"requires_password": self.requires_password,
"buttons": self._get_buttons(),
}
def _get_buttons(self) -> list[dict[str, str]]:
buttons = [{"action": "deny", "label": "Запретить"}]
if self.allow_always:
buttons.insert(0, {"action": "allow_always", "label": "Разрешить навсегда"})
if self.requires_password:
buttons.insert(0, {"action": "allow_with_password", "label": "Разрешить с паролем"})
else:
buttons.insert(0, {"action": "allow_once", "label": "Разрешить"})
return buttons
class PermissionDecision:
"""Permission decision."""
def __init__(
self,
decision: str,
command: str | None = None,
cached: bool = False,
) -> None:
self.decision = decision
self.command = command
self.cached = cached