feat: sync — push/pull API endpoints
- POST /api/v1/sync/push — accepts ops, assigns revisions, returns accepted list - POST /api/v1/sync/pull — returns ops since given revision with server_revision
This commit is contained in:
parent
10c6d06e38
commit
ad684eb118
|
|
@ -358,14 +358,113 @@ func (s *Server) handleSyncPush(w http.ResponseWriter, r *http.Request) {
|
||||||
if !s.requireAPIKey(w, r) {
|
if !s.requireAPIKey(w, r) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
jsonOK(w, map[string]string{"status": "ok", "message": "push endpoint ready (not yet implemented)"})
|
if r.Method != "POST" {
|
||||||
|
jsonErr(w, 405, "POST required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req struct {
|
||||||
|
DeviceID string `json:"device_id"`
|
||||||
|
Ops []struct {
|
||||||
|
OpID string `json:"op_id"`
|
||||||
|
EntityType string `json:"entity_type"`
|
||||||
|
EntityID string `json:"entity_id"`
|
||||||
|
OpType string `json:"op_type"`
|
||||||
|
PayloadJSON string `json:"payload_json"`
|
||||||
|
CreatedAt string `json:"created_at"`
|
||||||
|
} `json:"ops"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
jsonErr(w, 400, "invalid JSON: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var accepted []string
|
||||||
|
for _, op := range req.Ops {
|
||||||
|
if op.OpID == "" || op.EntityType == "" || op.EntityID == "" || op.OpType == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err := s.db.Exec(
|
||||||
|
`INSERT OR IGNORE INTO server_ops (op_id, device_id, entity_type, entity_id, op_type, payload_json, created_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||||
|
op.OpID, req.DeviceID, op.EntityType, op.EntityID, op.OpType, op.PayloadJSON, op.CreatedAt,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Assign revision.
|
||||||
|
res, err := s.db.Exec(
|
||||||
|
"INSERT INTO server_revisions (op_id, device_id) VALUES (?, ?)",
|
||||||
|
op.OpID, req.DeviceID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rev, _ := res.LastInsertId()
|
||||||
|
_ = rev
|
||||||
|
accepted = append(accepted, op.OpID)
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonOK(w, map[string]interface{}{
|
||||||
|
"accepted": accepted,
|
||||||
|
"count": len(accepted),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleSyncPull(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleSyncPull(w http.ResponseWriter, r *http.Request) {
|
||||||
if !s.requireAPIKey(w, r) {
|
if !s.requireAPIKey(w, r) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
jsonOK(w, map[string]string{"status": "ok", "message": "pull endpoint ready (not yet implemented)"})
|
if r.Method != "POST" {
|
||||||
|
jsonErr(w, 405, "POST required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req struct {
|
||||||
|
SinceRevision int `json:"since_revision"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
jsonErr(w, 400, "invalid JSON")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get current server revision.
|
||||||
|
var serverRev int
|
||||||
|
s.db.QueryRow("SELECT COALESCE(MAX(rev), 0) FROM server_revisions").Scan(&serverRev)
|
||||||
|
|
||||||
|
// Get ops since the requested revision.
|
||||||
|
rows, err := s.db.Query(`
|
||||||
|
SELECT so.op_id, so.device_id, so.entity_type, so.entity_id, so.op_type, so.payload_json, so.created_at
|
||||||
|
FROM server_ops so
|
||||||
|
JOIN server_revisions sr ON sr.op_id = so.op_id
|
||||||
|
WHERE sr.rev > ?
|
||||||
|
ORDER BY sr.rev`, req.SinceRevision)
|
||||||
|
if err != nil {
|
||||||
|
jsonErr(w, 500, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
type opDTO struct {
|
||||||
|
OpID string `json:"op_id"`
|
||||||
|
DeviceID string `json:"device_id"`
|
||||||
|
EntityType string `json:"entity_type"`
|
||||||
|
EntityID string `json:"entity_id"`
|
||||||
|
OpType string `json:"op_type"`
|
||||||
|
PayloadJSON string `json:"payload_json"`
|
||||||
|
CreatedAt string `json:"created_at"`
|
||||||
|
}
|
||||||
|
var ops []opDTO
|
||||||
|
for rows.Next() {
|
||||||
|
var o opDTO
|
||||||
|
if err := rows.Scan(&o.OpID, &o.DeviceID, &o.EntityType, &o.EntityID, &o.OpType, &o.PayloadJSON, &o.CreatedAt); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ops = append(ops, o)
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonOK(w, map[string]interface{}{
|
||||||
|
"server_revision": serverRev,
|
||||||
|
"ops": ops,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleBlobs(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleBlobs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue