From ea9054ad802c44ec3cddd2ea704bc8fe1d387c16 Mon Sep 17 00:00:00 2001 From: mirivlad Date: Tue, 7 Apr 2026 17:23:17 +0800 Subject: [PATCH] Add retention cleanup for persisted state --- README.md | 2 ++ serv/.env.example | 2 ++ serv/app.py | 25 +++++++++++++++++++++++-- serv/approvals.py | 29 ++++++++++++++++++++++++++++- serv/config.py | 11 +++++++++++ serv/jobs.py | 32 +++++++++++++++++++++++++++++++- 6 files changed, 97 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f09850c..44d54aa 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,8 @@ cp serv/.env.example serv/.env `ask-write` - shell и записи только после подтверждения `ask-all` - любой инструмент только после подтверждения - `NEW_QWEN_APPROVAL_TIMEOUT_SECONDS` - сколько сервер ждёт решения по approval +- `NEW_QWEN_JOBS_RETENTION_SECONDS` - сколько хранить завершённые/failed jobs +- `NEW_QWEN_APPROVALS_RETENTION_SECONDS` - сколько хранить завершённые approvals Бот: diff --git a/serv/.env.example b/serv/.env.example index 49c4ee9..a304aa0 100644 --- a/serv/.env.example +++ b/serv/.env.example @@ -10,3 +10,5 @@ NEW_QWEN_MAX_FILE_READ_BYTES=200000 NEW_QWEN_MAX_COMMAND_OUTPUT_BYTES=12000 NEW_QWEN_TOOL_POLICY=full-access NEW_QWEN_APPROVAL_TIMEOUT_SECONDS=3600 +NEW_QWEN_JOBS_RETENTION_SECONDS=604800 +NEW_QWEN_APPROVALS_RETENTION_SECONDS=604800 diff --git a/serv/app.py b/serv/app.py index cf1fdbc..d44b6a5 100644 --- a/serv/app.py +++ b/serv/app.py @@ -25,8 +25,14 @@ class AppState: self.sessions = SessionStore(config.session_dir) self.tools = ToolRegistry(config) self.agent = QwenAgent(config, self.oauth, self.tools) - self.jobs = JobStore(config.state_dir / "jobs") - self.approvals = ApprovalStore(config.state_dir / "approvals") + self.jobs = JobStore( + config.state_dir / "jobs", + retention_seconds=config.jobs_retention_seconds, + ) + self.approvals = ApprovalStore( + config.state_dir / "approvals", + retention_seconds=config.approvals_retention_seconds, + ) self.pending_flows_path = config.state_dir / "oauth_flows.json" self.pending_device_flows: dict[str, DeviceAuthState] = self._load_pending_flows() self.lock = threading.Lock() @@ -56,6 +62,9 @@ class AppState: continue if state.expires_at > now: flows[flow_id] = state + if len(flows) != len(payload): + self.pending_device_flows = flows + self._save_pending_flows() return flows def _save_pending_flows(self) -> None: @@ -78,6 +87,17 @@ class AppState: encoding="utf-8", ) + def cleanup_state(self) -> None: + self.jobs.cleanup() + self.approvals.cleanup() + with self.lock: + self.pending_device_flows = { + flow_id: state + for flow_id, state in self.pending_device_flows.items() + if state.expires_at > time.time() + } + self._save_pending_flows() + def auth_status(self) -> dict[str, Any]: creds = self.oauth.load_credentials() if not creds: @@ -368,6 +388,7 @@ def main() -> None: config.state_dir.mkdir(parents=True, exist_ok=True) httpd = ThreadingHTTPServer((config.host, config.port), RequestHandler) httpd.app_state = AppState(config) # type: ignore[attr-defined] + httpd.app_state.cleanup_state() # type: ignore[attr-defined] print(f"new-qwen serv listening on http://{config.host}:{config.port}") httpd.serve_forever() diff --git a/serv/approvals.py b/serv/approvals.py index 8639d10..c68af12 100644 --- a/serv/approvals.py +++ b/serv/approvals.py @@ -9,13 +9,15 @@ from typing import Any class ApprovalStore: - def __init__(self, base_dir: Path) -> None: + def __init__(self, base_dir: Path, retention_seconds: int) -> None: self.base_dir = base_dir self.base_dir.mkdir(parents=True, exist_ok=True) + self.retention_seconds = retention_seconds self._approvals: dict[str, dict[str, Any]] = {} self._conditions: dict[str, threading.Condition] = {} self._lock = threading.RLock() self._load_existing() + self.cleanup() def _path(self, approval_id: str) -> Path: return self.base_dir / f"{approval_id}.json" @@ -27,11 +29,19 @@ class ApprovalStore: ) def _load_existing(self) -> None: + now = time.time() for path in sorted(self.base_dir.glob("*.json")): try: approval = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue + updated_at = float(approval.get("updated_at") or approval.get("created_at") or now) + if now - updated_at > self.retention_seconds: + try: + path.unlink() + except OSError: + pass + continue if approval.get("status") == "pending": approval["status"] = "rejected" approval["reason"] = "Server restarted while waiting for approval" @@ -128,3 +138,20 @@ class ApprovalStore: condition.wait(timeout=remaining) return approval.copy() + def cleanup(self) -> None: + now = time.time() + with self._lock: + expired = [ + approval_id + for approval_id, approval in self._approvals.items() + if now + - float(approval.get("updated_at") or approval.get("created_at") or now) + > self.retention_seconds + ] + for approval_id in expired: + self._approvals.pop(approval_id, None) + self._conditions.pop(approval_id, None) + try: + self._path(approval_id).unlink() + except OSError: + pass diff --git a/serv/config.py b/serv/config.py index 9f6b23e..4d8a32e 100644 --- a/serv/config.py +++ b/serv/config.py @@ -30,6 +30,8 @@ class ServerConfig: max_command_output_bytes: int tool_policy: str approval_timeout_seconds: int + jobs_retention_seconds: int + approvals_retention_seconds: int @classmethod def load(cls) -> "ServerConfig": @@ -69,4 +71,13 @@ class ServerConfig: approval_timeout_seconds=int( os.environ.get("NEW_QWEN_APPROVAL_TIMEOUT_SECONDS", "3600") ), + jobs_retention_seconds=int( + os.environ.get("NEW_QWEN_JOBS_RETENTION_SECONDS", str(7 * 24 * 3600)) + ), + approvals_retention_seconds=int( + os.environ.get( + "NEW_QWEN_APPROVALS_RETENTION_SECONDS", + str(7 * 24 * 3600), + ) + ), ) diff --git a/serv/jobs.py b/serv/jobs.py index f0c684c..cc6d616 100644 --- a/serv/jobs.py +++ b/serv/jobs.py @@ -9,12 +9,14 @@ from typing import Any class JobStore: - def __init__(self, base_dir: Path) -> None: + def __init__(self, base_dir: Path, retention_seconds: int) -> None: self.base_dir = base_dir self.base_dir.mkdir(parents=True, exist_ok=True) + self.retention_seconds = retention_seconds self._jobs: dict[str, dict[str, Any]] = {} self._lock = threading.RLock() self._load_existing() + self.cleanup() def _path(self, job_id: str) -> Path: return self.base_dir / f"{job_id}.json" @@ -26,15 +28,27 @@ class JobStore: ) def _load_existing(self) -> None: + now = time.time() for path in sorted(self.base_dir.glob("*.json")): try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue + updated_at = float(payload.get("updated_at") or payload.get("created_at") or now) + if now - updated_at > self.retention_seconds: + try: + path.unlink() + except OSError: + pass + continue if payload.get("status") in {"queued", "running"}: payload["status"] = "failed" payload["error"] = "Server restarted while job was running" payload["updated_at"] = time.time() + path.write_text( + json.dumps(payload, ensure_ascii=False, indent=2), + encoding="utf-8", + ) self._jobs[payload["job_id"]] = payload def create(self, session_id: str, user_id: str, message: str) -> dict[str, Any]: @@ -104,3 +118,19 @@ class JobStore: job["error"] = error_message job["updated_at"] = time.time() self._save_job(job) + + def cleanup(self) -> None: + now = time.time() + with self._lock: + expired = [ + job_id + for job_id, job in self._jobs.items() + if now - float(job.get("updated_at") or job.get("created_at") or now) + > self.retention_seconds + ] + for job_id in expired: + self._jobs.pop(job_id, None) + try: + self._path(job_id).unlink() + except OSError: + pass