package watcher import ( "crypto/sha256" "encoding/hex" "fmt" "io" "log" "os" "path/filepath" "strings" "sync" "time" "github.com/fsnotify/fsnotify" "verstak/internal/core/activity" "verstak/internal/core/files" "verstak/internal/core/nodes" ) // DebounceWindow is how long we wait for a burst of fsnotify events to settle. const DebounceWindow = 2 * time.Second // Watcher wraps fsnotify to track filesystem changes in real time. type Watcher struct { vaultRoot string nodes *nodes.Repository files *files.Service activity *activity.Service w *fsnotify.Watcher mu sync.Mutex done chan struct{} watching bool // debounce buffers pending map[string][]fsnotify.Event // key = fsPath debounceT *time.Timer } // NewWatcher creates but does not start the watcher. func NewWatcher(vaultRoot string, nr *nodes.Repository, fs *files.Service, as *activity.Service) *Watcher { return &Watcher{ vaultRoot: vaultRoot, nodes: nr, files: fs, activity: as, pending: make(map[string][]fsnotify.Event), } } // Start begins watching all node directories. Returns an error if fsnotify fails. func (w *Watcher) Start() error { w.mu.Lock() defer w.mu.Unlock() if w.watching { return nil } fw, err := fsnotify.NewWatcher() if err != nil { return fmt.Errorf("fsnotify: %w", err) } w.w = fw w.done = make(chan struct{}) // Collect all node directories to watch. allNodes, err := w.nodes.ListAllWithFsPath() if err != nil { fw.Close() return fmt.Errorf("list nodes: %w", err) } watched := 0 for _, node := range allNodes { absDir := filepath.Join(w.vaultRoot, node.FsPath) if info, statErr := os.Stat(absDir); statErr == nil && info.IsDir() { // Watch the directory and its direct subdirectories. if err := w.addRecursive(absDir); err != nil { log.Printf("[watcher] add watch %s: %v", node.FsPath, err) continue } watched++ } } w.watching = true log.Printf("[watcher] started watching %d directories", watched) go w.loop() return nil } // Stop gracefully shuts down the watcher. func (w *Watcher) Stop() { w.mu.Lock() defer w.mu.Unlock() if !w.watching { return } w.watching = false if w.debounceT != nil { w.debounceT.Stop() } close(w.done) if w.w != nil { w.w.Close() } log.Printf("[watcher] stopped") } // IsWatching returns whether the watcher is active. func (w *Watcher) IsWatching() bool { w.mu.Lock() defer w.mu.Unlock() return w.watching } // addRecursive adds watchers for dir and all its non-hidden subdirectories. func (w *Watcher) addRecursive(dir string) error { return filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error { if err != nil { return nil } if !d.IsDir() { return nil } if d.Name() != "." && strings.HasPrefix(d.Name(), ".") { return filepath.SkipDir } return w.w.Add(path) }) } // loop is the main event processing goroutine. func (w *Watcher) loop() { for { select { case event, ok := <-w.w.Events: if !ok { return } w.bufferEvent(event) case err, ok := <-w.w.Errors: if !ok { return } log.Printf("[watcher] error: %v", err) case <-w.done: return } } } // bufferEvent adds an event to the debounce buffer. func (w *Watcher) bufferEvent(event fsnotify.Event) { w.mu.Lock() defer w.mu.Unlock() // Skip hidden files and .verstak directory. if isHiddenOrMeta(event.Name) { return } dir := filepath.Dir(event.Name) w.pending[dir] = append(w.pending[dir], event) if w.debounceT != nil { w.debounceT.Stop() } w.debounceT = time.AfterFunc(DebounceWindow, w.flushPending) } // flushPending processes all buffered events. func (w *Watcher) flushPending() { w.mu.Lock() events := w.pending w.pending = make(map[string][]fsnotify.Event) w.mu.Unlock() // Group by file path, keeping the most recent event of each kind. byFile := make(map[string]fsnotify.Event) for _, evts := range events { for _, e := range evts { byFile[e.Name] = e } } for absPath, ev := range byFile { rel, err := filepath.Rel(w.vaultRoot, absPath) if err != nil { continue } w.handleEvent(rel, absPath, ev) } } // handleEvent processes a single filesystem event. func (w *Watcher) handleEvent(rel, absPath string, ev fsnotify.Event) { switch { case ev.Has(fsnotify.Create): // File created — check for existing record, add if not present. fi, err := os.Stat(absPath) if err != nil { return } if fi.IsDir() { // New directory appeared — add watcher. _ = w.addRecursive(absPath) return } // Find the parent node by FsPath. parentDir := filepath.Dir(rel) node, err := w.findNodeByFsPath(parentDir) if err != nil { log.Printf("[watcher] no node for path %s: %v", parentDir, err) return } // Check if file record already exists (race with scanner). existing, _ := w.files.ListByNode(node.ID) for _, rec := range existing { if rec.Path == rel { // Already tracked — just mark as not missing. if rec.Missing { _ = w.files.MarkMissing(rec.ID, false) w.logActivity(node.ID, activity.TypeFileRestored, rec.Filename, rel) } return } } // New file — check if it's already inside the vault. // If absPath is under vaultRoot, don't copy — just create a record. if isUnderVault(absPath, w.vaultRoot) { _, err = w.files.AddExternal(node.ID, absPath) if err != nil { log.Printf("[watcher] auto-add in-vault file %s: %v", rel, err) return } } else { _, err = w.files.CopyIntoVault(node.ID, absPath, parentDir) if err != nil { log.Printf("[watcher] auto-add file %s: %v", rel, err) return } } w.logActivity(node.ID, activity.TypeFileAdded, fi.Name(), rel) case ev.Has(fsnotify.Remove) || ev.Has(fsnotify.Rename): // File deleted or renamed away. filename := filepath.Base(rel) rec, err := w.findRecordByPath(rel) if err != nil { return // unknown file, not tracked } if !rec.Missing { _ = w.files.MarkMissing(rec.ID, true) w.logActivity(rec.NodeID, activity.TypeFileDeleted, filename, rel) } case ev.Has(fsnotify.Write): // Content modified. rec, err := w.findRecordByPath(rel) if err != nil { return } sha, size := hashFileFast(absPath) _ = w.updateFileRecord(rec.ID, size, sha) w.logActivity(rec.NodeID, activity.TypeFileModified, rec.Filename, rel) case ev.Has(fsnotify.Chmod): // Permission changes — ignore. } } // findNodeByFsPath finds the node whose FsPath matches the given relative directory. func (w *Watcher) findNodeByFsPath(relDir string) (*nodes.Node, error) { all, err := w.nodes.ListAllWithFsPath() if err != nil { return nil, err } for i := range all { if all[i].FsPath == relDir { return &all[i], nil } } return nil, fmt.Errorf("no node with fs_path=%s", relDir) } // findRecordByPath finds a file record by its vault-relative path. // Checks active records first, then trashed. func (w *Watcher) findRecordByPath(relPath string) (*files.Record, error) { all, err := w.files.ListAllVaultWithTrashed() if err != nil { return nil, err } for i := range all { if all[i].Path == relPath { return &all[i], nil } } return nil, fmt.Errorf("no record for path %s", relPath) } func (w *Watcher) logActivity(nodeID, eventType, title, path string) { _ = w.activity.Record(nodeID, activity.TargetFile, "", path, eventType, title, "") } func (w *Watcher) updateFileRecord(id string, size int64, sha string) error { _, err := w.files.DB().Exec( `UPDATE files SET size=?, sha256=?, updated_at=?, missing=0 WHERE id=?`, size, sha, time.Now().UTC().Format(time.RFC3339), id) return err } // isHiddenOrMeta returns true for files in .verstak or hidden directories. func isHiddenOrMeta(path string) bool { parts := strings.Split(filepath.Clean(path), string(filepath.Separator)) for _, p := range parts { if p == ".verstak" { return true } if strings.HasPrefix(p, ".") && p != "." { return true } } return false } // hashFileFast computes SHA256 without stat (caller already has fi). func hashFileFast(absPath string) (string, int64) { f, err := os.Open(absPath) if err != nil { return "", 0 } defer f.Close() h := sha256.New() n, err := io.Copy(h, f) if err != nil { return "", 0 } return hex.EncodeToString(h.Sum(nil)), n } // isUnderVault reports whether absPath is inside vaultRoot. func isUnderVault(absPath, vaultRoot string) bool { absPath, _ = filepath.Abs(absPath) vaultRoot, _ = filepath.Abs(vaultRoot) absPath = filepath.Clean(absPath) vaultRoot = filepath.Clean(vaultRoot) return strings.HasPrefix(absPath, vaultRoot+string(filepath.Separator)) || absPath == vaultRoot }