feat: retry transient sync requests
This commit is contained in:
parent
04ce8c5bf3
commit
9f3448c1ea
|
|
@ -554,6 +554,8 @@ bundled runtime. Это реальный runtime contract для cooperative bun
|
||||||
| `api.files.showInFolder(relativePath)` | ✅ Работает | Показывает vault file/folder в системном файловом менеджере, требует `files.openExternal` |
|
| `api.files.showInFolder(relativePath)` | ✅ Работает | Показывает vault file/folder в системном файловом менеджере, требует `files.openExternal` |
|
||||||
| `api.workbench.openResource(request)` | ✅ Работает | Routes vault resources to `openProviders` |
|
| `api.workbench.openResource(request)` | ✅ Работает | Routes vault resources to `openProviders` |
|
||||||
| `api.workbench.editResource(request)` | ✅ Работает | Same routing, forcing `mode: "edit"` |
|
| `api.workbench.editResource(request)` | ✅ Работает | Same routing, forcing `mode: "edit"` |
|
||||||
|
| `api.sync.now()` | ✅ Работает | Push/pull с bounded retry/backoff для transient HTTP/network failures |
|
||||||
|
| `api.sync.status()` | ✅ Работает | Возвращает configured/connected/error/revoked state, lastError, unpushed count |
|
||||||
| `api.dispose()` | ✅ Работает | Очищает command handlers и event subscriptions текущего API instance |
|
| `api.dispose()` | ✅ Работает | Очищает command handlers и event subscriptions текущего API instance |
|
||||||
|
|
||||||
Ограничения:
|
Ограничения:
|
||||||
|
|
|
||||||
|
|
@ -61,6 +61,10 @@ type Client struct {
|
||||||
HTTP *http.Client
|
HTTP *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var syncRetrySleep = time.Sleep
|
||||||
|
|
||||||
|
const syncHTTPAttempts = 3
|
||||||
|
|
||||||
// NewClient creates a sync client.
|
// NewClient creates a sync client.
|
||||||
func NewClient(serverURL, apiKey, deviceID, vaultRoot string) *Client {
|
func NewClient(serverURL, apiKey, deviceID, vaultRoot string) *Client {
|
||||||
return &Client{
|
return &Client{
|
||||||
|
|
@ -318,7 +322,11 @@ func (c *Client) post(path string, body, result interface{}) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
req, err := http.NewRequest("POST", c.ServerURL+path, &b)
|
payload := b.Bytes()
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
|
for attempt := 1; attempt <= syncHTTPAttempts; attempt++ {
|
||||||
|
req, err := http.NewRequest("POST", c.ServerURL+path, bytes.NewReader(payload))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -327,21 +335,39 @@ func (c *Client) post(path string, body, result interface{}) error {
|
||||||
|
|
||||||
resp, err := c.HTTP.Do(req)
|
resp, err := c.HTTP.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("http: %w", err)
|
lastErr = fmt.Errorf("http: %w", err)
|
||||||
|
if attempt < syncHTTPAttempts {
|
||||||
|
syncRetrySleep(syncBackoffDelay(attempt))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return lastErr
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode >= 400 {
|
if resp.StatusCode >= 400 {
|
||||||
return c.readErrorBody(resp, resp.StatusCode)
|
if isTransientHTTPStatus(resp.StatusCode) && attempt < syncHTTPAttempts {
|
||||||
|
_, _ = io.Copy(io.Discard, resp.Body)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
syncRetrySleep(syncBackoffDelay(attempt))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err := c.readErrorBody(resp, resp.StatusCode)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if result != nil {
|
if result != nil {
|
||||||
return json.NewDecoder(resp.Body).Decode(result)
|
err := json.NewDecoder(resp.Body).Decode(result)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
_ = resp.Body.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return lastErr
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) get(path string, result interface{}) error {
|
func (c *Client) get(path string, result interface{}) error {
|
||||||
|
var lastErr error
|
||||||
|
for attempt := 1; attempt <= syncHTTPAttempts; attempt++ {
|
||||||
req, err := http.NewRequest("GET", c.ServerURL+path, nil)
|
req, err := http.NewRequest("GET", c.ServerURL+path, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -350,19 +376,49 @@ func (c *Client) get(path string, result interface{}) error {
|
||||||
|
|
||||||
resp, err := c.HTTP.Do(req)
|
resp, err := c.HTTP.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("http: %w", err)
|
lastErr = fmt.Errorf("http: %w", err)
|
||||||
|
if attempt < syncHTTPAttempts {
|
||||||
|
syncRetrySleep(syncBackoffDelay(attempt))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return lastErr
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode >= 400 {
|
if resp.StatusCode >= 400 {
|
||||||
return c.readErrorBody(resp, resp.StatusCode)
|
if isTransientHTTPStatus(resp.StatusCode) && attempt < syncHTTPAttempts {
|
||||||
|
_, _ = io.Copy(io.Discard, resp.Body)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
syncRetrySleep(syncBackoffDelay(attempt))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err := c.readErrorBody(resp, resp.StatusCode)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if result != nil {
|
if result != nil {
|
||||||
return json.NewDecoder(resp.Body).Decode(result)
|
err := json.NewDecoder(resp.Body).Decode(result)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
_ = resp.Body.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func syncBackoffDelay(attempt int) time.Duration {
|
||||||
|
return time.Duration(attempt) * 250 * time.Millisecond
|
||||||
|
}
|
||||||
|
|
||||||
|
func isTransientHTTPStatus(statusCode int) bool {
|
||||||
|
switch statusCode {
|
||||||
|
case http.StatusRequestTimeout, http.StatusTooManyRequests, http.StatusInternalServerError,
|
||||||
|
http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) readErrorBody(resp *http.Response, statusCode int) error {
|
func (c *Client) readErrorBody(resp *http.Response, statusCode int) error {
|
||||||
buf := make([]byte, 4096)
|
buf := make([]byte, 4096)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,88 @@
|
||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func withoutSyncRetrySleep(t *testing.T) {
|
||||||
|
t.Helper()
|
||||||
|
original := syncRetrySleep
|
||||||
|
syncRetrySleep = func(time.Duration) {}
|
||||||
|
t.Cleanup(func() {
|
||||||
|
syncRetrySleep = original
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushRetriesTransientServerErrors(t *testing.T) {
|
||||||
|
withoutSyncRetrySleep(t)
|
||||||
|
|
||||||
|
attempts := 0
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path != "/api/v1/sync/push" {
|
||||||
|
http.NotFound(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
attempts++
|
||||||
|
if attempts < 3 {
|
||||||
|
http.Error(w, "try again", http.StatusServiceUnavailable)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{
|
||||||
|
"accepted": []string{"op-1"},
|
||||||
|
"count": 1,
|
||||||
|
"conflicts": []map[string]interface{}{},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(server.URL, "", "device-1", t.TempDir())
|
||||||
|
client.DeviceToken = "token"
|
||||||
|
result, err := client.Push([]Op{{
|
||||||
|
OpID: "op-1",
|
||||||
|
EntityType: EntityFile,
|
||||||
|
EntityID: "Docs/one.txt",
|
||||||
|
OpType: OpCreate,
|
||||||
|
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
||||||
|
}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Push: %v", err)
|
||||||
|
}
|
||||||
|
if attempts != 3 {
|
||||||
|
t.Fatalf("attempts = %d, want 3", attempts)
|
||||||
|
}
|
||||||
|
if len(result.Accepted) != 1 || result.Accepted[0] != "op-1" {
|
||||||
|
t.Fatalf("push result = %#v", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushDoesNotRetryClientErrors(t *testing.T) {
|
||||||
|
withoutSyncRetrySleep(t)
|
||||||
|
|
||||||
|
attempts := 0
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
attempts++
|
||||||
|
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(server.URL, "", "device-1", t.TempDir())
|
||||||
|
client.DeviceToken = "token"
|
||||||
|
_, err := client.Push([]Op{{
|
||||||
|
OpID: "op-1",
|
||||||
|
EntityType: EntityFile,
|
||||||
|
EntityID: "Docs/one.txt",
|
||||||
|
OpType: OpCreate,
|
||||||
|
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
||||||
|
}})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("Push should fail on unauthorized response")
|
||||||
|
}
|
||||||
|
if attempts != 1 {
|
||||||
|
t.Fatalf("attempts = %d, want 1", attempts)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue