ducklm/app/core/intent_parser.py

131 lines
4.2 KiB
Python

from __future__ import annotations
import re
from typing import Any
from app.core.contracts import ExecutionDirective
SHELL_PREFIXES = (
"run ",
"execute ",
"launch ",
"запусти ",
"выполни ",
"выполнить ",
)
MEMORY_STORE_PATTERNS = (
r"запомни\s+(.+)",
r"сохрани\s+(.+)",
r"запиши\s+(.+)",
r"remember\s+(.+)",
r"save\s+(.+)",
)
MEMORY_SEARCH_PATTERNS = (
r"найди\s+(.+)",
r"вспомни\s+(.+)",
r"search\s+(.+)",
r"find\s+(.+)",
)
SYSTEM_COMMAND_PATTERNS = (
(
re.compile(r"(сколько\s+времени\s+запущен|как\s+долго\s+работает|uptime|аптайм)", re.IGNORECASE),
"uptime -p",
),
(
re.compile(r"(проверь|посмотри|покажи).*(обновлен|обновл|updates|upgradable)", re.IGNORECASE),
"apt list --upgradable",
),
)
class IntentParser:
"""Extracts explicit tool intents from natural-language task text."""
def __init__(self) -> None:
self._store_patterns = [re.compile(p, re.IGNORECASE) for p in MEMORY_STORE_PATTERNS]
self._search_patterns = [re.compile(p, re.IGNORECASE) for p in MEMORY_SEARCH_PATTERNS]
def parse(self, task_input: str) -> ExecutionDirective | None:
normalized = task_input.strip()
lowered = normalized.lower()
if matched := self._match_patterns(self._store_patterns, normalized):
return ExecutionDirective(
type="tool",
payload={
"tool": "memory_insert",
"args": {
"text": matched.group(1).strip(),
"kind": "fact",
"source": "user",
},
},
requires_permission=False,
confidence=0.85,
reason="User explicitly requested to store in memory.",
)
if matched := self._match_patterns(self._search_patterns, normalized):
return ExecutionDirective(
type="tool",
payload={
"tool": "memory_search",
"args": {"query": matched.group(1).strip()},
},
requires_permission=False,
confidence=0.85,
reason="User explicitly requested to search memory.",
)
for pattern, command in SYSTEM_COMMAND_PATTERNS:
if pattern.search(normalized):
return ExecutionDirective(
type="tool",
payload={
"tool": "shell_exec",
"args": {"command": command},
},
requires_permission=True,
confidence=0.9,
reason="User explicitly requested local system information.",
)
for prefix in SHELL_PREFIXES:
if lowered.startswith(prefix):
command = normalized[len(prefix) :].strip()
if command:
return ExecutionDirective(
type="tool",
payload={
"tool": "shell_exec",
"args": {"command": command},
},
requires_permission=True,
confidence=0.92,
reason="Natural-language task explicitly requested shell execution.",
)
quoted = re.match(r"^`(.+)`$", normalized)
if quoted:
return ExecutionDirective(
type="tool",
payload={
"tool": "shell_exec",
"args": {"command": quoted.group(1)},
},
requires_permission=True,
confidence=0.75,
reason="Backticked input treated as direct shell command.",
)
return None
def _match_patterns(self, patterns: list[re.Pattern], text: str):
for pattern in patterns:
if match := pattern.match(text):
return match
return None