verstak/cmd/verstak-server/server.go

132 lines
3.7 KiB
Go

package main
import (
"database/sql"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
)
type pairRateLimit struct {
mu sync.Mutex
attempts map[string]int
}
func (p *pairRateLimit) allow(ip string) bool {
p.mu.Lock()
defer p.mu.Unlock()
if p.attempts == nil {
p.attempts = make(map[string]int)
}
p.attempts[ip]++
return p.attempts[ip] <= 5
}
func (p *pairRateLimit) reset(ip string) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.attempts, ip)
}
type Server struct {
db *sql.DB
cfg *Config
tokens *tokenStore
userTokens *userTokenStore
blobsDir string
mux *http.ServeMux
pairLimit *pairRateLimit
}
func (s *Server) auditLog(eventType, userID, deviceID, ip, msg string) {
s.db.Exec("INSERT INTO server_audit_log (event_type, user_id, device_id, ip, message, created_at) VALUES (?, ?, ?, ?, ?, ?)",
eventType, userID, deviceID, ip, msg, time.Now().UTC().Format(time.RFC3339))
}
func NewServer(dbPath, dataDir string, cfg *Config) (*Server, error) {
db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc", dbPath))
if err != nil {
return nil, fmt.Errorf("open db: %w", err)
}
db.SetMaxOpenConns(1)
// Run schema.
for _, stmt := range strings.Split(serverSchema, ";") {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := db.Exec(stmt); err != nil {
db.Close()
return nil, fmt.Errorf("schema: %w", err)
}
}
// Migrations for older databases.
db.Exec("ALTER TABLE server_users ADD COLUMN blocked INTEGER NOT NULL DEFAULT 0")
db.Exec("ALTER TABLE server_users ADD COLUMN last_seen TEXT")
db.Exec("ALTER TABLE server_devices ADD COLUMN token_hash TEXT")
db.Exec("ALTER TABLE server_devices ADD COLUMN token_prefix TEXT")
db.Exec("ALTER TABLE server_devices ADD COLUMN token_suffix TEXT")
db.Exec("ALTER TABLE server_devices ADD COLUMN user_id TEXT")
db.Exec("ALTER TABLE server_devices ADD COLUMN client_version TEXT")
db.Exec("ALTER TABLE server_devices ADD COLUMN last_ip TEXT")
db.Exec("ALTER TABLE server_devices ADD COLUMN revoked_at TEXT")
// Migration: add server_sequence and tombstones.
db.Exec("ALTER TABLE server_ops ADD COLUMN server_sequence INTEGER")
db.Exec("CREATE INDEX IF NOT EXISTS idx_server_ops_sequence ON server_ops(server_sequence)")
db.Exec("CREATE INDEX IF NOT EXISTS idx_server_ops_entity ON server_ops(entity_type, entity_id)")
db.Exec(`CREATE TABLE IF NOT EXISTS server_tombstones (
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
op_id TEXT NOT NULL,
deleted_at TEXT NOT NULL,
PRIMARY KEY (entity_type, entity_id)
)`)
db.Exec("CREATE INDEX IF NOT EXISTS idx_server_ops_sequence ON server_ops(server_sequence)")
db.Exec("CREATE INDEX IF NOT EXISTS idx_server_ops_entity ON server_ops(entity_type, entity_id)")
db.Exec(`CREATE TABLE IF NOT EXISTS server_idempotency_keys (
idempotency_key TEXT PRIMARY KEY,
response_json TEXT NOT NULL,
created_at TEXT NOT NULL
)`)
db.Exec(`ALTER TABLE server_ops ADD COLUMN idempotency_key TEXT`)
db.Exec(`ALTER TABLE server_ops ADD COLUMN client_sequence INTEGER DEFAULT 0`)
db.Exec(`ALTER TABLE server_ops ADD COLUMN last_seen_server_seq INTEGER DEFAULT 0`)
blobsDir := filepath.Join(dataDir, "blobs")
if err := os.MkdirAll(blobsDir, 0750); err != nil {
db.Close()
return nil, err
}
s := &Server{
db: db,
cfg: cfg,
tokens: newTokenStore(),
userTokens: newUserTokenStore(),
blobsDir: blobsDir,
pairLimit: &pairRateLimit{},
}
s.mux = s.routes()
return s, nil
}
func (s *Server) locale() string {
return "ru"
}
func (s *Server) Close() error {
return s.db.Close()
}
func (s *Server) ListenAndServe(addr string) error {
return http.ListenAndServe(addr, s.mux)
}