33 lines
959 B
Python
33 lines
959 B
Python
from __future__ import annotations
|
|
|
|
from typing import Callable
|
|
|
|
from app.core.contracts import RuntimeEvent
|
|
from app.events.event_store import SQLiteEventStore
|
|
|
|
|
|
Subscriber = Callable[[RuntimeEvent], None]
|
|
|
|
|
|
class EventBus:
|
|
"""Per-task ordered event publishing with durable storage."""
|
|
|
|
def __init__(self, event_store: SQLiteEventStore) -> None:
|
|
self._store = event_store
|
|
self._subscribers: list[Subscriber] = []
|
|
|
|
def next_sequence(self, task_id: str) -> int:
|
|
return self._store.get_latest_sequence(task_id) + 1
|
|
|
|
def publish(self, event: RuntimeEvent) -> RuntimeEvent:
|
|
self._store.append(event)
|
|
for subscriber in self._subscribers:
|
|
subscriber(event)
|
|
return event
|
|
|
|
def subscribe(self, subscriber: Subscriber) -> None:
|
|
self._subscribers.append(subscriber)
|
|
|
|
def list_for_task(self, task_id: str) -> list[RuntimeEvent]:
|
|
return self._store.list_for_task(task_id)
|