verstak/internal/core/sync/sync.go

156 lines
4.0 KiB
Go

package sync
import (
"database/sql"
"encoding/json"
"fmt"
"time"
"verstak/internal/core/storage"
"verstak/internal/core/util"
)
// Entity types (matches activity targets).
const (
EntityNode = "node"
EntityNote = "note"
EntityFile = "file"
EntityFolder = "folder"
EntityAction = "action"
EntityWorklog = "worklog"
)
// Op types.
const (
OpCreate = "create"
OpUpdate = "update"
OpDelete = "delete"
OpMove = "move"
)
// Op represents a sync operation.
type Op struct {
ID string `json:"id"`
OpID string `json:"op_id"`
DeviceID string `json:"device_id,omitempty"`
EntityType string `json:"entity_type"`
EntityID string `json:"entity_id"`
OpType string `json:"op_type"`
PayloadJSON string `json:"payload_json"`
CreatedAt string `json:"created_at"`
PushedAt *string `json:"pushed_at,omitempty"`
}
// Service records and manages sync operations.
type Service struct {
db *storage.DB
deviceID string
}
// NewService creates a sync service.
func NewService(db *storage.DB, deviceID string) *Service {
return &Service{db: db, deviceID: deviceID}
}
// RecordOp writes a sync operation to the local sync_ops table.
func (s *Service) RecordOp(entityType, entityID, opType string, payload interface{}) error {
id := util.UUID7()
now := time.Now().UTC().Format(time.RFC3339)
var payloadStr string
if payload != nil {
b, err := json.Marshal(payload)
if err != nil {
return err
}
payloadStr = string(b)
}
_, err := s.db.Exec(
`INSERT INTO sync_ops (id, op_id, device_id, entity_type, entity_id, op_type, payload_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
id, id, s.deviceID, entityType, entityID, opType, payloadStr, now,
)
return err
}
// GetUnpushedOps returns ops that have not been pushed yet.
func (s *Service) GetUnpushedOps() ([]Op, error) {
rows, err := s.db.Query(
`SELECT id, op_id, device_id, entity_type, entity_id, op_type, payload_json, created_at, pushed_at
FROM sync_ops WHERE pushed_at IS NULL ORDER BY created_at`)
if err != nil {
return nil, err
}
defer rows.Close()
return scanOps(rows)
}
// MarkPushed marks ops as pushed to server.
func (s *Service) MarkPushed(opIDs []string) error {
now := time.Now().UTC().Format(time.RFC3339)
for _, id := range opIDs {
_, err := s.db.Exec("UPDATE sync_ops SET pushed_at=? WHERE op_id=?", now, id)
if err != nil {
return err
}
}
return nil
}
// MarkApplied marks remote ops as applied locally.
func (s *Service) MarkApplied(opIDs []string) error {
now := time.Now().UTC().Format(time.RFC3339)
for _, id := range opIDs {
_, err := s.db.Exec("UPDATE sync_ops SET applied_at=? WHERE op_id=?", now, id)
if err != nil {
return err
}
}
return nil
}
// GetState returns the current sync state.
func (s *Service) GetState() (serverURL, apiKey string, lastPushRev int, lastSyncAt string, err error) {
err = s.db.QueryRow(
`SELECT server_url, api_key, last_push_rev, COALESCE(last_sync_at,'') FROM sync_state WHERE device_id=?`,
s.deviceID).Scan(&serverURL, &apiKey, &lastPushRev, &lastSyncAt)
if err == sql.ErrNoRows {
return "", "", 0, "", nil
}
return
}
// SetState saves sync connection state.
func (s *Service) SetState(serverURL, apiKey string) error {
_, err := s.db.Exec(
`INSERT INTO sync_state (device_id, server_url, api_key, last_push_rev, last_sync_at)
VALUES (?, ?, ?, 0, '')
ON CONFLICT(device_id) DO UPDATE SET server_url=excluded.server_url, api_key=excluded.api_key`,
s.deviceID, serverURL, apiKey,
)
return err
}
// --- helpers ---
func scanOps(rows *sql.Rows) ([]Op, error) {
var out []Op
for rows.Next() {
var o Op
var pushedAt sql.NullString
if err := rows.Scan(&o.ID, &o.OpID, &o.DeviceID, &o.EntityType, &o.EntityID,
&o.OpType, &o.PayloadJSON, &o.CreatedAt, &pushedAt); err != nil {
return nil, err
}
if pushedAt.Valid {
o.PushedAt = &pushedAt.String
}
out = append(out, o)
}
return out, rows.Err()
}
// MustVar ensures the package is not considered unused.
var _ = fmt.Sprintf