import json from pathlib import Path from typing import Any import aiosqlite from pydantic import BaseModel from duck_core.tasks.store import utc_now class Event(BaseModel): id: int task_id: str sequence: int event_type: str payload: dict[str, Any] created_at: str class EventStore: def __init__(self, db_path: str): self.db_path = Path(db_path) async def init(self) -> None: self.db_path.parent.mkdir(parents=True, exist_ok=True) async with aiosqlite.connect(self.db_path) as db: await db.execute( """ create table if not exists events ( id integer primary key autoincrement, task_id text not null, sequence integer not null, event_type text not null, payload_json text not null, created_at text not null ) """ ) await db.execute( """ create unique index if not exists idx_events_task_sequence on events(task_id, sequence) """ ) await db.commit() async def append(self, task_id: str, event_type: str, payload: dict[str, Any]) -> Event: await self.init() async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute( "select coalesce(max(sequence), 0) + 1 from events where task_id = ?", (task_id,), ) sequence = (await cursor.fetchone())[0] created_at = utc_now() cursor = await db.execute( """ insert into events(task_id, sequence, event_type, payload_json, created_at) values (?, ?, ?, ?, ?) """, (task_id, sequence, event_type, json.dumps(payload), created_at), ) await db.commit() event_id = cursor.lastrowid return Event( id=event_id, task_id=task_id, sequence=sequence, event_type=event_type, payload=payload, created_at=created_at, ) async def list_events(self, task_id: str) -> list[Event]: await self.init() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "select * from events where task_id = ? order by sequence", (task_id,) ) rows = await cursor.fetchall() return [ Event( id=row["id"], task_id=row["task_id"], sequence=row["sequence"], event_type=row["event_type"], payload=json.loads(row["payload_json"]), created_at=row["created_at"], ) for row in rows ] async def list_by_type(self, event_type: str, limit: int = 100) -> list[Event]: await self.init() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( """ select * from events where event_type = ? order by id desc limit ? """, (event_type, limit), ) rows = await cursor.fetchall() return [ Event( id=row["id"], task_id=row["task_id"], sequence=row["sequence"], event_type=row["event_type"], payload=json.loads(row["payload_json"]), created_at=row["created_at"], ) for row in rows ]