diff --git a/README.md b/README.md index f96eeca..15eb4fb 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Qwen OAuth + OpenAI-compatible endpoint - хранение токенов в `~/.qwen/oauth_creds.json` - HTTP API сервера - агентный цикл с tool calling -- инструменты: `list_files`, `glob_search`, `grep_text`, `stat_path`, `read_file`, `append_file`, `apply_unified_diff`, `replace_in_file`, `write_file`, `make_directory`, `delete_path`, `move_path`, `copy_path`, `git_status`, `git_diff`, `exec_command` +- инструменты: `list_files`, `glob_search`, `grep_text`, `stat_path`, `read_file`, `web_search`, `append_file`, `apply_unified_diff`, `replace_in_file`, `write_file`, `make_directory`, `delete_path`, `move_path`, `copy_path`, `git_status`, `git_diff`, `exec_command` - Telegram polling без внешних библиотек - JSON-хранилище сессий - API списка и просмотра сессий @@ -38,6 +38,7 @@ Qwen OAuth + OpenAI-compatible endpoint - persistence для chat jobs и pending OAuth flows на стороне `serv` - policy mode для инструментов: `full-access`, `workspace-write`, `read-only` - live approval flow для инструментов через Telegram +- provider-based web search с приоритетом DashScope через Qwen OAuth ## Ограничения текущей реализации @@ -66,6 +67,8 @@ cp serv/.env.example serv/.env - `NEW_QWEN_APPROVAL_TIMEOUT_SECONDS` - сколько сервер ждёт решения по approval - `NEW_QWEN_JOBS_RETENTION_SECONDS` - сколько хранить завершённые/failed jobs - `NEW_QWEN_APPROVALS_RETENTION_SECONDS` - сколько хранить завершённые approvals +- `NEW_QWEN_TAVILY_API_KEY` - опциональный Tavily provider +- `NEW_QWEN_GOOGLE_SEARCH_API_KEY` и `NEW_QWEN_GOOGLE_SEARCH_ENGINE_ID` - опциональный Google Custom Search provider Бот: diff --git a/serv/.env.example b/serv/.env.example index a304aa0..6db4f6b 100644 --- a/serv/.env.example +++ b/serv/.env.example @@ -12,3 +12,6 @@ NEW_QWEN_TOOL_POLICY=full-access NEW_QWEN_APPROVAL_TIMEOUT_SECONDS=3600 NEW_QWEN_JOBS_RETENTION_SECONDS=604800 NEW_QWEN_APPROVALS_RETENTION_SECONDS=604800 +NEW_QWEN_TAVILY_API_KEY= +NEW_QWEN_GOOGLE_SEARCH_API_KEY= +NEW_QWEN_GOOGLE_SEARCH_ENGINE_ID= diff --git a/serv/app.py b/serv/app.py index d44b6a5..8d3e315 100644 --- a/serv/app.py +++ b/serv/app.py @@ -23,7 +23,7 @@ class AppState: self.config = config self.oauth = QwenOAuthManager() self.sessions = SessionStore(config.session_dir) - self.tools = ToolRegistry(config) + self.tools = ToolRegistry(config, self.oauth) self.agent = QwenAgent(config, self.oauth, self.tools) self.jobs = JobStore( config.state_dir / "jobs", diff --git a/serv/config.py b/serv/config.py index 4d8a32e..bbe0f2a 100644 --- a/serv/config.py +++ b/serv/config.py @@ -32,6 +32,9 @@ class ServerConfig: approval_timeout_seconds: int jobs_retention_seconds: int approvals_retention_seconds: int + tavily_api_key: str + google_search_api_key: str + google_search_engine_id: str @classmethod def load(cls) -> "ServerConfig": @@ -80,4 +83,13 @@ class ServerConfig: str(7 * 24 * 3600), ) ), + tavily_api_key=os.environ.get("NEW_QWEN_TAVILY_API_KEY", "").strip(), + google_search_api_key=os.environ.get( + "NEW_QWEN_GOOGLE_SEARCH_API_KEY", + "", + ).strip(), + google_search_engine_id=os.environ.get( + "NEW_QWEN_GOOGLE_SEARCH_ENGINE_ID", + "", + ).strip(), ) diff --git a/serv/tools.py b/serv/tools.py index 8f3c3c2..be5c59c 100644 --- a/serv/tools.py +++ b/serv/tools.py @@ -11,6 +11,8 @@ from pathlib import Path from typing import Any, Callable from config import ServerConfig +from oauth import QwenOAuthManager +from web_search import WebSearchError, WebSearchService class ToolError(RuntimeError): @@ -18,15 +20,17 @@ class ToolError(RuntimeError): class ToolRegistry: - def __init__(self, config: ServerConfig) -> None: + def __init__(self, config: ServerConfig, oauth: QwenOAuthManager) -> None: self.config = config self.workspace_root = config.workspace_root.resolve() + self.web_search = WebSearchService(config, oauth) self._handlers: dict[str, Callable[[dict[str, Any]], dict[str, Any]]] = { "list_files": self.list_files, "glob_search": self.glob_search, "grep_text": self.grep_text, "stat_path": self.stat_path, "read_file": self.read_file, + "web_search": self.web_search_tool, "append_file": self.append_file, "apply_unified_diff": self.apply_unified_diff, "replace_in_file": self.replace_in_file, @@ -116,6 +120,21 @@ class ToolRegistry: }, }, }, + { + "type": "function", + "function": { + "name": "web_search", + "description": "Search the web using configured providers and return summarized results with sources.", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string"}, + "provider": {"type": "string"}, + }, + "required": ["query"], + }, + }, + }, { "type": "function", "function": { @@ -293,7 +312,16 @@ class ToolRegistry: def _check_policy(self, tool_name: str) -> None: policy = self.config.tool_policy - read_only_tools = {"list_files", "glob_search", "grep_text", "stat_path", "read_file"} + read_only_tools = { + "list_files", + "glob_search", + "grep_text", + "stat_path", + "read_file", + "web_search", + "git_status", + "git_diff", + } shell_tools = {"exec_command"} if policy in {"full-access", "ask-shell", "ask-write", "ask-all"}: return @@ -326,6 +354,8 @@ class ToolRegistry: shell_tools = {"exec_command"} if policy == "ask-all": return True + if tool_name == "web_search": + return True if policy == "ask-shell": return tool_name in shell_tools if policy == "ask-write": @@ -439,6 +469,16 @@ class ToolRegistry: "truncated": truncated, } + def web_search_tool(self, arguments: dict[str, Any]) -> dict[str, Any]: + query = str(arguments["query"]).strip() + provider = arguments.get("provider") + if not query: + raise ToolError("Query cannot be empty") + try: + return self.web_search.search(query, provider_name=provider) + except WebSearchError as exc: + raise ToolError(str(exc)) from exc + def write_file(self, arguments: dict[str, Any]) -> dict[str, Any]: target = self._resolve(arguments["path"]) target.parent.mkdir(parents=True, exist_ok=True) diff --git a/serv/web_search.py b/serv/web_search.py new file mode 100644 index 0000000..87fc5b8 --- /dev/null +++ b/serv/web_search.py @@ -0,0 +1,268 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any +from urllib import parse, request + +from config import ServerConfig +from oauth import QwenOAuthManager + + +class WebSearchError(RuntimeError): + pass + + +@dataclass(slots=True) +class WebSearchResultItem: + title: str + url: str + content: str = "" + score: float | None = None + published_date: str | None = None + + +@dataclass(slots=True) +class WebSearchResult: + query: str + provider: str + answer: str + results: list[WebSearchResultItem] + + +class BaseWebSearchProvider: + name = "base" + + def is_available(self) -> bool: + raise NotImplementedError + + def search(self, query: str) -> WebSearchResult: + if not self.is_available(): + raise WebSearchError(f"[{self.name}] Provider is not available") + try: + return self.perform_search(query) + except WebSearchError: + raise + except Exception as exc: + raise WebSearchError(f"[{self.name}] Search failed: {exc}") from exc + + def perform_search(self, query: str) -> WebSearchResult: + raise NotImplementedError + + +class DashScopeWebSearchProvider(BaseWebSearchProvider): + name = "dashscope" + + def __init__(self, oauth: QwenOAuthManager) -> None: + self.oauth = oauth + + def is_available(self) -> bool: + try: + creds = self.oauth.load_credentials() + except Exception: + return False + return bool(creds and creds.get("resource_url")) + + def perform_search(self, query: str) -> WebSearchResult: + creds = self.oauth.get_valid_credentials() + access_token = creds.get("access_token") + resource_url = creds.get("resource_url") + if not access_token or not resource_url: + raise WebSearchError("[dashscope] Qwen OAuth credentials are not available") + base_url = resource_url if str(resource_url).startswith("http") else f"https://{resource_url}" + api_endpoint = base_url.rstrip("/") + "/api/v1/indices/plugin/web_search" + payload = json.dumps({"uq": query, "page": 1, "rows": 10}).encode("utf-8") + req = request.Request( + api_endpoint, + data=payload, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {access_token}", + }, + method="POST", + ) + with request.urlopen(req, timeout=60) as response: + data = json.loads(response.read().decode("utf-8")) + if data.get("status") != 0: + raise WebSearchError( + f"[dashscope] API error: {data.get('message') or 'unknown error'}" + ) + docs = (data.get("data") or {}).get("docs") or [] + results = [ + WebSearchResultItem( + title=item.get("title") or "Untitled", + url=item.get("url") or "", + content=item.get("snippet") or "", + score=item.get("_score"), + published_date=item.get("timestamp_format"), + ) + for item in docs + ] + return WebSearchResult( + query=query, + provider=self.name, + answer="", + results=results, + ) + + +class TavilyWebSearchProvider(BaseWebSearchProvider): + name = "tavily" + + def __init__(self, api_key: str) -> None: + self.api_key = api_key + + def is_available(self) -> bool: + return bool(self.api_key) + + def perform_search(self, query: str) -> WebSearchResult: + payload = json.dumps( + { + "api_key": self.api_key, + "query": query, + "search_depth": "advanced", + "max_results": 5, + "include_answer": True, + } + ).encode("utf-8") + req = request.Request( + "https://api.tavily.com/search", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with request.urlopen(req, timeout=60) as response: + data = json.loads(response.read().decode("utf-8")) + results = [ + WebSearchResultItem( + title=item.get("title") or "Untitled", + url=item.get("url") or "", + content=item.get("content") or "", + score=item.get("score"), + published_date=item.get("published_date"), + ) + for item in data.get("results") or [] + ] + return WebSearchResult( + query=query, + provider=self.name, + answer=(data.get("answer") or "").strip(), + results=results, + ) + + +class GoogleWebSearchProvider(BaseWebSearchProvider): + name = "google" + + def __init__(self, api_key: str, search_engine_id: str) -> None: + self.api_key = api_key + self.search_engine_id = search_engine_id + + def is_available(self) -> bool: + return bool(self.api_key and self.search_engine_id) + + def perform_search(self, query: str) -> WebSearchResult: + params = parse.urlencode( + { + "key": self.api_key, + "cx": self.search_engine_id, + "q": query, + "num": "10", + "safe": "medium", + } + ) + url = f"https://www.googleapis.com/customsearch/v1?{params}" + with request.urlopen(url, timeout=60) as response: + data = json.loads(response.read().decode("utf-8")) + results = [ + WebSearchResultItem( + title=item.get("title") or "Untitled", + url=item.get("link") or "", + content=item.get("snippet") or "", + ) + for item in data.get("items") or [] + ] + return WebSearchResult( + query=query, + provider=self.name, + answer="", + results=results, + ) + + +class WebSearchService: + def __init__(self, config: ServerConfig, oauth: QwenOAuthManager) -> None: + self.providers: dict[str, BaseWebSearchProvider] = { + "dashscope": DashScopeWebSearchProvider(oauth), + "tavily": TavilyWebSearchProvider(config.tavily_api_key), + "google": GoogleWebSearchProvider( + config.google_search_api_key, + config.google_search_engine_id, + ), + } + + def list_available_providers(self) -> list[str]: + return [ + provider_name + for provider_name, provider in self.providers.items() + if provider.is_available() + ] + + def search(self, query: str, provider_name: str | None = None) -> dict[str, Any]: + available = { + name: provider + for name, provider in self.providers.items() + if provider.is_available() + } + if not available: + raise WebSearchError("No web search providers are available") + if provider_name: + provider = available.get(provider_name) + if not provider: + raise WebSearchError( + f'Provider "{provider_name}" is not available. Available: {", ".join(sorted(available))}' + ) + else: + provider = available.get("dashscope") or next(iter(available.values())) + result = provider.search(query) + return self._format_result(result) + + def _format_result(self, result: WebSearchResult) -> dict[str, Any]: + sources = [{"title": item.title, "url": item.url} for item in result.results] + if result.answer.strip(): + content = result.answer.strip() + if sources: + source_lines = "\n".join( + f"[{index + 1}] {source['title']} ({source['url']})" + for index, source in enumerate(sources) + ) + content += f"\n\nSources:\n{source_lines}" + else: + blocks: list[str] = [] + for index, item in enumerate(result.results[:5], start=1): + parts = [f"{index}. {item.title}"] + if item.content: + parts.append(item.content.strip()) + parts.append(f"Source: {item.url}") + if item.published_date: + parts.append(f"Published: {item.published_date}") + blocks.append("\n".join(parts)) + content = "\n\n".join(blocks) + if content: + content += "\n\nNote: Use web_fetch-like follow-up tooling for deeper page content." + return { + "query": result.query, + "provider": result.provider, + "content": content or f'No search results found for "{result.query}".', + "sources": sources, + "results": [ + { + "title": item.title, + "url": item.url, + "content": item.content, + "score": item.score, + "published_date": item.published_date, + } + for item in result.results + ], + }