verstak/internal/core/sync/sync.go

187 lines
5.1 KiB
Go

package sync
import (
"database/sql"
"encoding/json"
"fmt"
"time"
"verstak/internal/core/storage"
"verstak/internal/core/util"
)
// Entity types (matches activity targets).
const (
EntityNode = "node"
EntityNote = "note"
EntityFile = "file"
EntityFolder = "folder"
EntityAction = "action"
EntityWorklog = "worklog"
)
// Op types.
const (
OpCreate = "create"
OpUpdate = "update"
OpDelete = "delete"
OpMove = "move"
)
// Op represents a sync operation.
type Op struct {
ID string `json:"id"`
OpID string `json:"op_id"`
ServerSequence int `json:"server_sequence,omitempty"`
DeviceID string `json:"device_id,omitempty"`
EntityType string `json:"entity_type"`
EntityID string `json:"entity_id"`
OpType string `json:"op_type"`
PayloadJSON string `json:"payload_json"`
CreatedAt string `json:"created_at"`
PushedAt *string `json:"pushed_at,omitempty"`
}
// Service records and manages sync operations.
type Service struct {
db *storage.DB
deviceID string
}
// NewService creates a sync service.
func NewService(db *storage.DB, deviceID string) *Service {
return &Service{db: db, deviceID: deviceID}
}
// RecordOp writes a sync operation to the local sync_ops table.
func (s *Service) RecordOp(entityType, entityID, opType string, payload interface{}) error {
id := util.UUID7()
now := time.Now().UTC().Format(time.RFC3339)
var payloadStr string
if payload != nil {
b, err := json.Marshal(payload)
if err != nil {
return err
}
payloadStr = string(b)
}
_, err := s.db.Exec(
`INSERT INTO sync_ops (id, op_id, device_id, entity_type, entity_id, op_type, payload_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
id, id, s.deviceID, entityType, entityID, opType, payloadStr, now,
)
return err
}
// RecordRemoteOp writes a remote op to the local sync_ops table (already applied server-side).
func (s *Service) RecordRemoteOp(op Op) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := s.db.Exec(
`INSERT OR IGNORE INTO sync_ops (id, op_id, device_id, entity_type, entity_id, op_type, payload_json, created_at, pushed_at, applied_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
op.OpID+"-remote", op.OpID, op.DeviceID, op.EntityType, op.EntityID, op.OpType, op.PayloadJSON, op.CreatedAt, now, now,
)
return err
}
// GetUnpushedOps returns ops that have not been pushed yet.
func (s *Service) GetUnpushedOps() ([]Op, error) {
rows, err := s.db.Query(
`SELECT id, op_id, device_id, entity_type, entity_id, op_type, payload_json, created_at, pushed_at
FROM sync_ops WHERE pushed_at IS NULL ORDER BY created_at`)
if err != nil {
return nil, err
}
defer rows.Close()
return scanOps(rows)
}
// MarkPushed marks ops as pushed to server.
func (s *Service) MarkPushed(opIDs []string) error {
now := time.Now().UTC().Format(time.RFC3339)
for _, id := range opIDs {
_, err := s.db.Exec("UPDATE sync_ops SET pushed_at=? WHERE op_id=?", now, id)
if err != nil {
return err
}
}
return nil
}
// MarkApplied marks remote ops as applied locally.
func (s *Service) MarkApplied(opIDs []string) error {
now := time.Now().UTC().Format(time.RFC3339)
for _, id := range opIDs {
_, err := s.db.Exec("UPDATE sync_ops SET applied_at=? WHERE op_id=?", now, id)
if err != nil {
return err
}
}
return nil
}
// GetState returns the current sync state.
func (s *Service) GetState() (serverURL, apiKey string, lastPullSeq int, lastSyncAt string, err error) {
err = s.db.QueryRow(
`SELECT server_url, api_key, last_pull_seq, COALESCE(last_sync_at,'') FROM sync_state WHERE device_id=?`,
s.deviceID).Scan(&serverURL, &apiKey, &lastPullSeq, &lastSyncAt)
if err == sql.ErrNoRows {
return "", "", 0, "", nil
}
return
}
// SetState saves sync connection state.
func (s *Service) SetState(serverURL, apiKey string) error {
_, err := s.db.Exec(
`INSERT INTO sync_state (device_id, server_url, api_key, last_pull_seq, last_sync_at)
VALUES (?, ?, ?, 0, '')
ON CONFLICT(device_id) DO UPDATE SET
server_url=excluded.server_url,
api_key=excluded.api_key`,
s.deviceID, serverURL, apiKey,
)
return err
}
// SetLastPullSeq updates the last pulled server sequence.
func (s *Service) SetLastPullSeq(seq int) error {
_, err := s.db.Exec("UPDATE sync_state SET last_pull_seq=? WHERE device_id=?", seq, s.deviceID)
return err
}
// GetDeviceID returns the device ID used by this service.
func (s *Service) GetDeviceID() string {
return s.deviceID
}
// SetLastSyncAt updates the last sync timestamp.
func (s *Service) SetLastSyncAt(t string) error {
_, err := s.db.Exec("UPDATE sync_state SET last_sync_at=? WHERE device_id=?", t, s.deviceID)
return err
}
// --- helpers ---
func scanOps(rows *sql.Rows) ([]Op, error) {
var out []Op
for rows.Next() {
var o Op
var pushedAt sql.NullString
if err := rows.Scan(&o.ID, &o.OpID, &o.DeviceID, &o.EntityType, &o.EntityID,
&o.OpType, &o.PayloadJSON, &o.CreatedAt, &pushedAt); err != nil {
return nil, err
}
if pushedAt.Valid {
o.PushedAt = &pushedAt.String
}
out = append(out, o)
}
return out, rows.Err()
}
// MustVar ensures the package is not considered unused.
var _ = fmt.Sprintf