361 lines
8.4 KiB
Go
361 lines
8.4 KiB
Go
package watcher
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
|
|
"verstak/internal/core/activity"
|
|
"verstak/internal/core/files"
|
|
"verstak/internal/core/nodes"
|
|
)
|
|
|
|
// DebounceWindow is how long we wait for a burst of fsnotify events to settle.
|
|
const DebounceWindow = 2 * time.Second
|
|
|
|
// Watcher wraps fsnotify to track filesystem changes in real time.
|
|
type Watcher struct {
|
|
vaultRoot string
|
|
nodes *nodes.Repository
|
|
files *files.Service
|
|
activity *activity.Service
|
|
|
|
w *fsnotify.Watcher
|
|
mu sync.Mutex
|
|
done chan struct{}
|
|
watching bool
|
|
|
|
// debounce buffers
|
|
pending map[string][]fsnotify.Event // key = fsPath
|
|
debounceT *time.Timer
|
|
}
|
|
|
|
// NewWatcher creates but does not start the watcher.
|
|
func NewWatcher(vaultRoot string, nr *nodes.Repository, fs *files.Service, as *activity.Service) *Watcher {
|
|
return &Watcher{
|
|
vaultRoot: vaultRoot,
|
|
nodes: nr,
|
|
files: fs,
|
|
activity: as,
|
|
pending: make(map[string][]fsnotify.Event),
|
|
}
|
|
}
|
|
|
|
// Start begins watching all node directories. Returns an error if fsnotify fails.
|
|
func (w *Watcher) Start() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.watching {
|
|
return nil
|
|
}
|
|
|
|
fw, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
return fmt.Errorf("fsnotify: %w", err)
|
|
}
|
|
w.w = fw
|
|
w.done = make(chan struct{})
|
|
|
|
// Collect all node directories to watch.
|
|
allNodes, err := w.nodes.ListAllWithFsPath()
|
|
if err != nil {
|
|
fw.Close()
|
|
return fmt.Errorf("list nodes: %w", err)
|
|
}
|
|
|
|
watched := 0
|
|
for _, node := range allNodes {
|
|
absDir := filepath.Join(w.vaultRoot, node.FsPath)
|
|
if info, statErr := os.Stat(absDir); statErr == nil && info.IsDir() {
|
|
// Watch the directory and its direct subdirectories.
|
|
if err := w.addRecursive(absDir); err != nil {
|
|
log.Printf("[watcher] add watch %s: %v", node.FsPath, err)
|
|
continue
|
|
}
|
|
watched++
|
|
}
|
|
}
|
|
|
|
w.watching = true
|
|
log.Printf("[watcher] started watching %d directories", watched)
|
|
|
|
go w.loop()
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the watcher.
|
|
func (w *Watcher) Stop() {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if !w.watching {
|
|
return
|
|
}
|
|
w.watching = false
|
|
if w.debounceT != nil {
|
|
w.debounceT.Stop()
|
|
}
|
|
close(w.done)
|
|
if w.w != nil {
|
|
w.w.Close()
|
|
}
|
|
log.Printf("[watcher] stopped")
|
|
}
|
|
|
|
// IsWatching returns whether the watcher is active.
|
|
func (w *Watcher) IsWatching() bool {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.watching
|
|
}
|
|
|
|
// addRecursive adds watchers for dir and all its non-hidden subdirectories.
|
|
func (w *Watcher) addRecursive(dir string) error {
|
|
return filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if !d.IsDir() {
|
|
return nil
|
|
}
|
|
if d.Name() != "." && strings.HasPrefix(d.Name(), ".") {
|
|
return filepath.SkipDir
|
|
}
|
|
return w.w.Add(path)
|
|
})
|
|
}
|
|
|
|
// loop is the main event processing goroutine.
|
|
func (w *Watcher) loop() {
|
|
for {
|
|
select {
|
|
case event, ok := <-w.w.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
w.bufferEvent(event)
|
|
|
|
case err, ok := <-w.w.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
log.Printf("[watcher] error: %v", err)
|
|
|
|
case <-w.done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// bufferEvent adds an event to the debounce buffer.
|
|
func (w *Watcher) bufferEvent(event fsnotify.Event) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
// Skip hidden files and .verstak directory.
|
|
if isHiddenOrMeta(event.Name) {
|
|
return
|
|
}
|
|
|
|
dir := filepath.Dir(event.Name)
|
|
w.pending[dir] = append(w.pending[dir], event)
|
|
|
|
if w.debounceT != nil {
|
|
w.debounceT.Stop()
|
|
}
|
|
w.debounceT = time.AfterFunc(DebounceWindow, w.flushPending)
|
|
}
|
|
|
|
// flushPending processes all buffered events.
|
|
func (w *Watcher) flushPending() {
|
|
w.mu.Lock()
|
|
events := w.pending
|
|
w.pending = make(map[string][]fsnotify.Event)
|
|
w.mu.Unlock()
|
|
|
|
// Group by file path, keeping the most recent event of each kind.
|
|
byFile := make(map[string]fsnotify.Event)
|
|
for _, evts := range events {
|
|
for _, e := range evts {
|
|
byFile[e.Name] = e
|
|
}
|
|
}
|
|
|
|
for absPath, ev := range byFile {
|
|
rel, err := filepath.Rel(w.vaultRoot, absPath)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
w.handleEvent(rel, absPath, ev)
|
|
}
|
|
}
|
|
|
|
// handleEvent processes a single filesystem event.
|
|
func (w *Watcher) handleEvent(rel, absPath string, ev fsnotify.Event) {
|
|
switch {
|
|
case ev.Has(fsnotify.Create):
|
|
// File created — check for existing record, add if not present.
|
|
fi, err := os.Stat(absPath)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if fi.IsDir() {
|
|
// New directory appeared — add watcher.
|
|
_ = w.addRecursive(absPath)
|
|
return
|
|
}
|
|
|
|
// Find the parent node by FsPath.
|
|
parentDir := filepath.Dir(rel)
|
|
node, err := w.findNodeByFsPath(parentDir)
|
|
if err != nil {
|
|
log.Printf("[watcher] no node for path %s: %v", parentDir, err)
|
|
return
|
|
}
|
|
|
|
// Check if file record already exists (race with scanner).
|
|
existing, _ := w.files.ListByNode(node.ID)
|
|
for _, rec := range existing {
|
|
if rec.Path == rel {
|
|
// Already tracked — just mark as not missing.
|
|
if rec.Missing {
|
|
_ = w.files.MarkMissing(rec.ID, false)
|
|
w.logActivity(node.ID, activity.TypeFileRestored, rec.Filename, rel)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
// New file — check if it's already inside the vault.
|
|
// If absPath is under vaultRoot, don't copy — just create a record.
|
|
if isUnderVault(absPath, w.vaultRoot) {
|
|
_, err = w.files.AddExternal(node.ID, absPath)
|
|
if err != nil {
|
|
log.Printf("[watcher] auto-add in-vault file %s: %v", rel, err)
|
|
return
|
|
}
|
|
} else {
|
|
_, err = w.files.CopyIntoVault(node.ID, absPath, parentDir)
|
|
if err != nil {
|
|
log.Printf("[watcher] auto-add file %s: %v", rel, err)
|
|
return
|
|
}
|
|
}
|
|
w.logActivity(node.ID, activity.TypeFileAdded, fi.Name(), rel)
|
|
|
|
case ev.Has(fsnotify.Remove) || ev.Has(fsnotify.Rename):
|
|
// File deleted or renamed away.
|
|
filename := filepath.Base(rel)
|
|
|
|
rec, err := w.findRecordByPath(rel)
|
|
if err != nil {
|
|
return // unknown file, not tracked
|
|
}
|
|
if !rec.Missing {
|
|
_ = w.files.MarkMissing(rec.ID, true)
|
|
w.logActivity(rec.NodeID, activity.TypeFileDeleted, filename, rel)
|
|
}
|
|
|
|
case ev.Has(fsnotify.Write):
|
|
// Content modified.
|
|
rec, err := w.findRecordByPath(rel)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
sha, size := hashFileFast(absPath)
|
|
_ = w.updateFileRecord(rec.ID, size, sha)
|
|
w.logActivity(rec.NodeID, activity.TypeFileModified, rec.Filename, rel)
|
|
|
|
case ev.Has(fsnotify.Chmod):
|
|
// Permission changes — ignore.
|
|
}
|
|
}
|
|
|
|
// findNodeByFsPath finds the node whose FsPath matches the given relative directory.
|
|
func (w *Watcher) findNodeByFsPath(relDir string) (*nodes.Node, error) {
|
|
all, err := w.nodes.ListAllWithFsPath()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for i := range all {
|
|
if all[i].FsPath == relDir {
|
|
return &all[i], nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("no node with fs_path=%s", relDir)
|
|
}
|
|
|
|
// findRecordByPath finds a file record by its vault-relative path.
|
|
// Checks active records first, then trashed.
|
|
func (w *Watcher) findRecordByPath(relPath string) (*files.Record, error) {
|
|
all, err := w.files.ListAllVaultWithTrashed()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for i := range all {
|
|
if all[i].Path == relPath {
|
|
return &all[i], nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("no record for path %s", relPath)
|
|
}
|
|
|
|
func (w *Watcher) logActivity(nodeID, eventType, title, path string) {
|
|
_ = w.activity.Record(nodeID, activity.TargetFile, "", path, eventType, title, "")
|
|
}
|
|
|
|
func (w *Watcher) updateFileRecord(id string, size int64, sha string) error {
|
|
_, err := w.files.DB().Exec(
|
|
`UPDATE files SET size=?, sha256=?, updated_at=?, missing=0 WHERE id=?`,
|
|
size, sha, time.Now().UTC().Format(time.RFC3339), id)
|
|
return err
|
|
}
|
|
|
|
// isHiddenOrMeta returns true for files in .verstak or hidden directories.
|
|
func isHiddenOrMeta(path string) bool {
|
|
parts := strings.Split(filepath.Clean(path), string(filepath.Separator))
|
|
for _, p := range parts {
|
|
if p == ".verstak" {
|
|
return true
|
|
}
|
|
if strings.HasPrefix(p, ".") && p != "." {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// hashFileFast computes SHA256 without stat (caller already has fi).
|
|
func hashFileFast(absPath string) (string, int64) {
|
|
f, err := os.Open(absPath)
|
|
if err != nil {
|
|
return "", 0
|
|
}
|
|
defer f.Close()
|
|
|
|
h := sha256.New()
|
|
n, err := io.Copy(h, f)
|
|
if err != nil {
|
|
return "", 0
|
|
}
|
|
return hex.EncodeToString(h.Sum(nil)), n
|
|
}
|
|
|
|
// isUnderVault reports whether absPath is inside vaultRoot.
|
|
func isUnderVault(absPath, vaultRoot string) bool {
|
|
absPath, _ = filepath.Abs(absPath)
|
|
vaultRoot, _ = filepath.Abs(vaultRoot)
|
|
return strings.HasPrefix(absPath, vaultRoot+string(filepath.Separator)) || absPath == vaultRoot
|
|
}
|