diff --git a/docs/PLUGIN_RUNTIME.md b/docs/PLUGIN_RUNTIME.md index 11aafc0..c74d437 100644 --- a/docs/PLUGIN_RUNTIME.md +++ b/docs/PLUGIN_RUNTIME.md @@ -554,6 +554,8 @@ bundled runtime. Это реальный runtime contract для cooperative bun | `api.files.showInFolder(relativePath)` | ✅ Работает | Показывает vault file/folder в системном файловом менеджере, требует `files.openExternal` | | `api.workbench.openResource(request)` | ✅ Работает | Routes vault resources to `openProviders` | | `api.workbench.editResource(request)` | ✅ Работает | Same routing, forcing `mode: "edit"` | +| `api.sync.now()` | ✅ Работает | Push/pull с bounded retry/backoff для transient HTTP/network failures | +| `api.sync.status()` | ✅ Работает | Возвращает configured/connected/error/revoked state, lastError, unpushed count | | `api.dispose()` | ✅ Работает | Очищает command handlers и event subscriptions текущего API instance | Ограничения: diff --git a/internal/core/sync/client.go b/internal/core/sync/client.go index a59c292..1b7ccf5 100644 --- a/internal/core/sync/client.go +++ b/internal/core/sync/client.go @@ -61,6 +61,10 @@ type Client struct { HTTP *http.Client } +var syncRetrySleep = time.Sleep + +const syncHTTPAttempts = 3 + // NewClient creates a sync client. func NewClient(serverURL, apiKey, deviceID, vaultRoot string) *Client { return &Client{ @@ -318,50 +322,102 @@ func (c *Client) post(path string, body, result interface{}) error { return err } } - req, err := http.NewRequest("POST", c.ServerURL+path, &b) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+c.bearerToken()) + payload := b.Bytes() - resp, err := c.HTTP.Do(req) - if err != nil { - return fmt.Errorf("http: %w", err) - } - defer resp.Body.Close() + var lastErr error + for attempt := 1; attempt <= syncHTTPAttempts; attempt++ { + req, err := http.NewRequest("POST", c.ServerURL+path, bytes.NewReader(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+c.bearerToken()) - if resp.StatusCode >= 400 { - return c.readErrorBody(resp, resp.StatusCode) - } + resp, err := c.HTTP.Do(req) + if err != nil { + lastErr = fmt.Errorf("http: %w", err) + if attempt < syncHTTPAttempts { + syncRetrySleep(syncBackoffDelay(attempt)) + continue + } + return lastErr + } + if resp.StatusCode >= 400 { + if isTransientHTTPStatus(resp.StatusCode) && attempt < syncHTTPAttempts { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + syncRetrySleep(syncBackoffDelay(attempt)) + continue + } + err := c.readErrorBody(resp, resp.StatusCode) + _ = resp.Body.Close() + return err + } - if result != nil { - return json.NewDecoder(resp.Body).Decode(result) + if result != nil { + err := json.NewDecoder(resp.Body).Decode(result) + _ = resp.Body.Close() + return err + } + _ = resp.Body.Close() + return nil } - return nil + return lastErr } func (c *Client) get(path string, result interface{}) error { - req, err := http.NewRequest("GET", c.ServerURL+path, nil) - if err != nil { - return err - } - req.Header.Set("Authorization", "Bearer "+c.bearerToken()) + var lastErr error + for attempt := 1; attempt <= syncHTTPAttempts; attempt++ { + req, err := http.NewRequest("GET", c.ServerURL+path, nil) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+c.bearerToken()) - resp, err := c.HTTP.Do(req) - if err != nil { - return fmt.Errorf("http: %w", err) - } - defer resp.Body.Close() + resp, err := c.HTTP.Do(req) + if err != nil { + lastErr = fmt.Errorf("http: %w", err) + if attempt < syncHTTPAttempts { + syncRetrySleep(syncBackoffDelay(attempt)) + continue + } + return lastErr + } + if resp.StatusCode >= 400 { + if isTransientHTTPStatus(resp.StatusCode) && attempt < syncHTTPAttempts { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + syncRetrySleep(syncBackoffDelay(attempt)) + continue + } + err := c.readErrorBody(resp, resp.StatusCode) + _ = resp.Body.Close() + return err + } - if resp.StatusCode >= 400 { - return c.readErrorBody(resp, resp.StatusCode) + if result != nil { + err := json.NewDecoder(resp.Body).Decode(result) + _ = resp.Body.Close() + return err + } + _ = resp.Body.Close() + return nil } + return lastErr +} - if result != nil { - return json.NewDecoder(resp.Body).Decode(result) +func syncBackoffDelay(attempt int) time.Duration { + return time.Duration(attempt) * 250 * time.Millisecond +} + +func isTransientHTTPStatus(statusCode int) bool { + switch statusCode { + case http.StatusRequestTimeout, http.StatusTooManyRequests, http.StatusInternalServerError, + http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + return true + default: + return false } - return nil } func (c *Client) readErrorBody(resp *http.Response, statusCode int) error { diff --git a/internal/core/sync/client_test.go b/internal/core/sync/client_test.go new file mode 100644 index 0000000..95b9da5 --- /dev/null +++ b/internal/core/sync/client_test.go @@ -0,0 +1,88 @@ +package sync + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func withoutSyncRetrySleep(t *testing.T) { + t.Helper() + original := syncRetrySleep + syncRetrySleep = func(time.Duration) {} + t.Cleanup(func() { + syncRetrySleep = original + }) +} + +func TestPushRetriesTransientServerErrors(t *testing.T) { + withoutSyncRetrySleep(t) + + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v1/sync/push" { + http.NotFound(w, r) + return + } + attempts++ + if attempts < 3 { + http.Error(w, "try again", http.StatusServiceUnavailable) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "accepted": []string{"op-1"}, + "count": 1, + "conflicts": []map[string]interface{}{}, + }) + })) + defer server.Close() + + client := NewClient(server.URL, "", "device-1", t.TempDir()) + client.DeviceToken = "token" + result, err := client.Push([]Op{{ + OpID: "op-1", + EntityType: EntityFile, + EntityID: "Docs/one.txt", + OpType: OpCreate, + CreatedAt: time.Now().UTC().Format(time.RFC3339), + }}) + if err != nil { + t.Fatalf("Push: %v", err) + } + if attempts != 3 { + t.Fatalf("attempts = %d, want 3", attempts) + } + if len(result.Accepted) != 1 || result.Accepted[0] != "op-1" { + t.Fatalf("push result = %#v", result) + } +} + +func TestPushDoesNotRetryClientErrors(t *testing.T) { + withoutSyncRetrySleep(t) + + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + http.Error(w, "unauthorized", http.StatusUnauthorized) + })) + defer server.Close() + + client := NewClient(server.URL, "", "device-1", t.TempDir()) + client.DeviceToken = "token" + _, err := client.Push([]Op{{ + OpID: "op-1", + EntityType: EntityFile, + EntityID: "Docs/one.txt", + OpType: OpCreate, + CreatedAt: time.Now().UTC().Format(time.RFC3339), + }}) + if err == nil { + t.Fatal("Push should fail on unauthorized response") + } + if attempts != 1 { + t.Fatalf("attempts = %d, want 1", attempts) + } +}