61 lines
2.1 KiB
Python
61 lines
2.1 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import shlex
|
|
from typing import Any
|
|
|
|
from app.core.permission_service import PermissionService
|
|
|
|
|
|
class CommandAnalyzer:
|
|
"""Deterministic shell action analyzer for structured critic evidence."""
|
|
|
|
_SPLIT_RE = re.compile(r"\s*(?:&&|;)\s*")
|
|
|
|
def __init__(self, permission_service: PermissionService) -> None:
|
|
self._permission_service = permission_service
|
|
|
|
def analyze(self, command: str, task_id: str, session_id: str) -> dict[str, Any]:
|
|
segments = [segment.strip() for segment in self._SPLIT_RE.split(command) if segment.strip()]
|
|
root_required: list[str] = []
|
|
elevated: list[str] = []
|
|
unelevated_root: list[str] = []
|
|
|
|
for segment in segments:
|
|
normalized, is_elevated = self._strip_sudo(segment)
|
|
check = self._permission_service.check_shell_command(
|
|
task_id=task_id,
|
|
session_id=session_id,
|
|
command=normalized,
|
|
)
|
|
if check.get("requires_sudo"):
|
|
root_required.append(normalized)
|
|
if is_elevated:
|
|
elevated.append(normalized)
|
|
else:
|
|
unelevated_root.append(normalized)
|
|
|
|
diagnosis_type = "privilege_scope_error" if unelevated_root else "ok"
|
|
return {
|
|
"type": diagnosis_type,
|
|
"command": command,
|
|
"segments": segments,
|
|
"root_required_segments": root_required,
|
|
"elevated_segments": elevated,
|
|
"unelevated_root_segments": unelevated_root,
|
|
}
|
|
|
|
def _strip_sudo(self, segment: str) -> tuple[str, bool]:
|
|
try:
|
|
parts = shlex.split(segment)
|
|
except ValueError:
|
|
return segment, segment.strip().startswith("sudo ")
|
|
if not parts or parts[0] != "sudo":
|
|
return segment, False
|
|
index = 1
|
|
while index < len(parts) and parts[index].startswith("-"):
|
|
index += 1
|
|
if index < len(parts) and parts[index - 1] in {"-p", "--prompt"}:
|
|
index += 1
|
|
return " ".join(shlex.quote(part) for part in parts[index:]), True
|