107 lines
3.3 KiB
Python
107 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
class JobStore:
|
|
def __init__(self, base_dir: Path) -> None:
|
|
self.base_dir = base_dir
|
|
self.base_dir.mkdir(parents=True, exist_ok=True)
|
|
self._jobs: dict[str, dict[str, Any]] = {}
|
|
self._lock = threading.RLock()
|
|
self._load_existing()
|
|
|
|
def _path(self, job_id: str) -> Path:
|
|
return self.base_dir / f"{job_id}.json"
|
|
|
|
def _save_job(self, job: dict[str, Any]) -> None:
|
|
self._path(job["job_id"]).write_text(
|
|
json.dumps(job, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
def _load_existing(self) -> None:
|
|
for path in sorted(self.base_dir.glob("*.json")):
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
if payload.get("status") in {"queued", "running"}:
|
|
payload["status"] = "failed"
|
|
payload["error"] = "Server restarted while job was running"
|
|
payload["updated_at"] = time.time()
|
|
self._jobs[payload["job_id"]] = payload
|
|
|
|
def create(self, session_id: str, user_id: str, message: str) -> dict[str, Any]:
|
|
job_id = uuid.uuid4().hex
|
|
job = {
|
|
"job_id": job_id,
|
|
"session_id": session_id,
|
|
"user_id": user_id,
|
|
"message": message,
|
|
"status": "queued",
|
|
"created_at": time.time(),
|
|
"updated_at": time.time(),
|
|
"events": [],
|
|
"answer": None,
|
|
"usage": None,
|
|
"error": None,
|
|
}
|
|
with self._lock:
|
|
self._jobs[job_id] = job
|
|
self._save_job(job)
|
|
return job
|
|
|
|
def get(self, job_id: str) -> dict[str, Any] | None:
|
|
with self._lock:
|
|
job = self._jobs.get(job_id)
|
|
if not job:
|
|
return None
|
|
return {
|
|
key: (value.copy() if isinstance(value, list) else value)
|
|
for key, value in job.items()
|
|
}
|
|
|
|
def append_event(self, job_id: str, event: dict[str, Any]) -> None:
|
|
with self._lock:
|
|
job = self._jobs[job_id]
|
|
seq = len(job["events"]) + 1
|
|
job["events"].append({"seq": seq, **event})
|
|
job["updated_at"] = time.time()
|
|
self._save_job(job)
|
|
|
|
def set_status(self, job_id: str, status: str) -> None:
|
|
with self._lock:
|
|
job = self._jobs[job_id]
|
|
job["status"] = status
|
|
job["updated_at"] = time.time()
|
|
self._save_job(job)
|
|
|
|
def finish(
|
|
self,
|
|
job_id: str,
|
|
*,
|
|
answer: str,
|
|
usage: dict[str, Any] | None,
|
|
) -> None:
|
|
with self._lock:
|
|
job = self._jobs[job_id]
|
|
job["status"] = "completed"
|
|
job["answer"] = answer
|
|
job["usage"] = usage
|
|
job["updated_at"] = time.time()
|
|
self._save_job(job)
|
|
|
|
def fail(self, job_id: str, error_message: str) -> None:
|
|
with self._lock:
|
|
job = self._jobs[job_id]
|
|
job["status"] = "failed"
|
|
job["error"] = error_message
|
|
job["updated_at"] = time.time()
|
|
self._save_job(job)
|