verstak/internal/core/watcher/watcher.go

345 lines
7.8 KiB
Go

package watcher
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"verstak/internal/core/activity"
"verstak/internal/core/files"
"verstak/internal/core/nodes"
)
// DebounceWindow is how long we wait for a burst of fsnotify events to settle.
const DebounceWindow = 2 * time.Second
// Watcher wraps fsnotify to track filesystem changes in real time.
type Watcher struct {
vaultRoot string
nodes *nodes.Repository
files *files.Service
activity *activity.Service
w *fsnotify.Watcher
mu sync.Mutex
done chan struct{}
watching bool
// debounce buffers
pending map[string][]fsnotify.Event // key = fsPath
debounceT *time.Timer
}
// NewWatcher creates but does not start the watcher.
func NewWatcher(vaultRoot string, nr *nodes.Repository, fs *files.Service, as *activity.Service) *Watcher {
return &Watcher{
vaultRoot: vaultRoot,
nodes: nr,
files: fs,
activity: as,
pending: make(map[string][]fsnotify.Event),
}
}
// Start begins watching all node directories. Returns an error if fsnotify fails.
func (w *Watcher) Start() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.watching {
return nil
}
fw, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("fsnotify: %w", err)
}
w.w = fw
w.done = make(chan struct{})
// Collect all node directories to watch.
allNodes, err := w.nodes.ListAllWithFsPath()
if err != nil {
fw.Close()
return fmt.Errorf("list nodes: %w", err)
}
watched := 0
for _, node := range allNodes {
absDir := filepath.Join(w.vaultRoot, node.FsPath)
if info, statErr := os.Stat(absDir); statErr == nil && info.IsDir() {
// Watch the directory and its direct subdirectories.
if err := w.addRecursive(absDir); err != nil {
log.Printf("[watcher] add watch %s: %v", node.FsPath, err)
continue
}
watched++
}
}
w.watching = true
log.Printf("[watcher] started watching %d directories", watched)
go w.loop()
return nil
}
// Stop gracefully shuts down the watcher.
func (w *Watcher) Stop() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.watching {
return
}
w.watching = false
if w.debounceT != nil {
w.debounceT.Stop()
}
close(w.done)
if w.w != nil {
w.w.Close()
}
log.Printf("[watcher] stopped")
}
// IsWatching returns whether the watcher is active.
func (w *Watcher) IsWatching() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.watching
}
// addRecursive adds watchers for dir and all its non-hidden subdirectories.
func (w *Watcher) addRecursive(dir string) error {
return filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return nil
}
if !d.IsDir() {
return nil
}
if d.Name() != "." && strings.HasPrefix(d.Name(), ".") {
return filepath.SkipDir
}
return w.w.Add(path)
})
}
// loop is the main event processing goroutine.
func (w *Watcher) loop() {
for {
select {
case event, ok := <-w.w.Events:
if !ok {
return
}
w.bufferEvent(event)
case err, ok := <-w.w.Errors:
if !ok {
return
}
log.Printf("[watcher] error: %v", err)
case <-w.done:
return
}
}
}
// bufferEvent adds an event to the debounce buffer.
func (w *Watcher) bufferEvent(event fsnotify.Event) {
w.mu.Lock()
defer w.mu.Unlock()
// Skip hidden files and .verstak directory.
if isHiddenOrMeta(event.Name) {
return
}
dir := filepath.Dir(event.Name)
w.pending[dir] = append(w.pending[dir], event)
if w.debounceT != nil {
w.debounceT.Stop()
}
w.debounceT = time.AfterFunc(DebounceWindow, w.flushPending)
}
// flushPending processes all buffered events.
func (w *Watcher) flushPending() {
w.mu.Lock()
events := w.pending
w.pending = make(map[string][]fsnotify.Event)
w.mu.Unlock()
// Group by file path, keeping the most recent event of each kind.
byFile := make(map[string]fsnotify.Event)
for _, evts := range events {
for _, e := range evts {
byFile[e.Name] = e
}
}
for absPath, ev := range byFile {
rel, err := filepath.Rel(w.vaultRoot, absPath)
if err != nil {
continue
}
w.handleEvent(rel, absPath, ev)
}
}
// handleEvent processes a single filesystem event.
func (w *Watcher) handleEvent(rel, absPath string, ev fsnotify.Event) {
switch {
case ev.Has(fsnotify.Create):
// File created — check for existing record, add if not present.
fi, err := os.Stat(absPath)
if err != nil {
return
}
if fi.IsDir() {
// New directory appeared — add watcher.
_ = w.addRecursive(absPath)
return
}
// Find the parent node by FsPath.
parentDir := filepath.Dir(rel)
node, err := w.findNodeByFsPath(parentDir)
if err != nil {
log.Printf("[watcher] no node for path %s: %v", parentDir, err)
return
}
// Check if file record already exists (race with scanner).
existing, _ := w.files.ListByNode(node.ID)
for _, rec := range existing {
if rec.Path == rel {
// Already tracked — just mark as not missing.
if rec.Missing {
_ = w.files.MarkMissing(rec.ID, false)
w.logActivity(node.ID, activity.TypeFileRestored, rec.Filename, rel)
}
return
}
}
// New file — create record.
_, err = w.files.CopyIntoVault(node.ID, absPath, parentDir)
if err != nil {
log.Printf("[watcher] auto-add file %s: %v", rel, err)
return
}
w.logActivity(node.ID, activity.TypeFileAdded, fi.Name(), rel)
case ev.Has(fsnotify.Remove) || ev.Has(fsnotify.Rename):
// File deleted or renamed away.
filename := filepath.Base(rel)
rec, err := w.findRecordByPath(rel)
if err != nil {
return // unknown file, not tracked
}
if !rec.Missing {
_ = w.files.MarkMissing(rec.ID, true)
w.logActivity(rec.NodeID, activity.TypeFileDeleted, filename, rel)
}
case ev.Has(fsnotify.Write):
// Content modified.
rec, err := w.findRecordByPath(rel)
if err != nil {
return
}
sha, size := hashFileFast(absPath)
_ = w.updateFileRecord(rec.ID, size, sha)
w.logActivity(rec.NodeID, activity.TypeFileModified, rec.Filename, rel)
case ev.Has(fsnotify.Chmod):
// Permission changes — ignore.
}
}
// findNodeByFsPath finds the node whose FsPath matches the given relative directory.
func (w *Watcher) findNodeByFsPath(relDir string) (*nodes.Node, error) {
all, err := w.nodes.ListAllWithFsPath()
if err != nil {
return nil, err
}
for i := range all {
if all[i].FsPath == relDir {
return &all[i], nil
}
}
return nil, fmt.Errorf("no node with fs_path=%s", relDir)
}
// findRecordByPath finds a file record by its vault-relative path.
// Checks active records first, then trashed.
func (w *Watcher) findRecordByPath(relPath string) (*files.Record, error) {
all, err := w.files.ListAllVaultWithTrashed()
if err != nil {
return nil, err
}
for i := range all {
if all[i].Path == relPath {
return &all[i], nil
}
}
return nil, fmt.Errorf("no record for path %s", relPath)
}
func (w *Watcher) logActivity(nodeID, eventType, title, path string) {
_ = w.activity.Record(nodeID, activity.TargetFile, "", path, eventType, title, "")
}
func (w *Watcher) updateFileRecord(id string, size int64, sha string) error {
_, err := w.files.DB().Exec(
`UPDATE files SET size=?, sha256=?, updated_at=?, missing=0 WHERE id=?`,
size, sha, time.Now().UTC().Format(time.RFC3339), id)
return err
}
// isHiddenOrMeta returns true for files in .verstak or hidden directories.
func isHiddenOrMeta(path string) bool {
parts := strings.Split(filepath.Clean(path), string(filepath.Separator))
for _, p := range parts {
if p == ".verstak" {
return true
}
if strings.HasPrefix(p, ".") && p != "." {
return true
}
}
return false
}
// hashFileFast computes SHA256 without stat (caller already has fi).
func hashFileFast(absPath string) (string, int64) {
f, err := os.Open(absPath)
if err != nil {
return "", 0
}
defer f.Close()
h := sha256.New()
n, err := io.Copy(h, f)
if err != nil {
return "", 0
}
return hex.EncodeToString(h.Sum(nil)), n
}