from __future__ import annotations import json import threading import time import uuid from pathlib import Path from typing import Any class JobStore: def __init__(self, base_dir: Path, retention_seconds: int) -> None: self.base_dir = base_dir self.base_dir.mkdir(parents=True, exist_ok=True) self.retention_seconds = retention_seconds self._jobs: dict[str, dict[str, Any]] = {} self._lock = threading.RLock() self._load_existing() self.cleanup() def _path(self, job_id: str) -> Path: return self.base_dir / f"{job_id}.json" def _save_job(self, job: dict[str, Any]) -> None: self._path(job["job_id"]).write_text( json.dumps(job, ensure_ascii=False, indent=2), encoding="utf-8", ) def _load_existing(self) -> None: now = time.time() for path in sorted(self.base_dir.glob("*.json")): try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue updated_at = float(payload.get("updated_at") or payload.get("created_at") or now) if now - updated_at > self.retention_seconds: try: path.unlink() except OSError: pass continue if payload.get("status") in {"queued", "running"}: payload["status"] = "failed" payload["error"] = "Server restarted while job was running" payload["updated_at"] = time.time() path.write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8", ) self._jobs[payload["job_id"]] = payload def create(self, session_id: str, user_id: str, message: str) -> dict[str, Any]: job_id = uuid.uuid4().hex job = { "job_id": job_id, "session_id": session_id, "user_id": user_id, "message": message, "status": "queued", "created_at": time.time(), "updated_at": time.time(), "events": [], "answer": None, "usage": None, "error": None, } with self._lock: self._jobs[job_id] = job self._save_job(job) return job def get(self, job_id: str) -> dict[str, Any] | None: with self._lock: job = self._jobs.get(job_id) if not job: return None return { key: (value.copy() if isinstance(value, list) else value) for key, value in job.items() } def append_event(self, job_id: str, event: dict[str, Any]) -> None: with self._lock: job = self._jobs[job_id] seq = len(job["events"]) + 1 job["events"].append({"seq": seq, **event}) job["updated_at"] = time.time() self._save_job(job) def set_status(self, job_id: str, status: str) -> None: with self._lock: job = self._jobs[job_id] job["status"] = status job["updated_at"] = time.time() self._save_job(job) def finish( self, job_id: str, *, answer: str, usage: dict[str, Any] | None, ) -> None: with self._lock: job = self._jobs[job_id] job["status"] = "completed" job["answer"] = answer job["usage"] = usage job["updated_at"] = time.time() self._save_job(job) def fail(self, job_id: str, error_message: str) -> None: with self._lock: job = self._jobs[job_id] job["status"] = "failed" job["error"] = error_message job["updated_at"] = time.time() self._save_job(job) def cleanup(self) -> None: now = time.time() with self._lock: expired = [ job_id for job_id, job in self._jobs.items() if now - float(job.get("updated_at") or job.get("created_at") or now) > self.retention_seconds ] for job_id in expired: self._jobs.pop(job_id, None) try: self._path(job_id).unlink() except OSError: pass