package cluster import ( "context" "encoding/json" "fmt" "log" "os" "sync" "time" "github.com/nats-io/nats.go" ) // LeaderElection manages NATS-based leader election using lease-based coordination type LeaderElection struct { nodeID string natsConn *nats.Conn js nats.JetStreamContext kv nats.KeyValue isLeader bool currentLeader string leaderTerm uint64 ctx context.Context cancel context.CancelFunc mutex sync.RWMutex logger *log.Logger callbacks LeaderElectionCallbacks } // NewLeaderElection creates a new NATS-based leader election system func NewLeaderElection(nodeID string, natsConn *nats.Conn, callbacks LeaderElectionCallbacks) (*LeaderElection, error) { ctx, cancel := context.WithCancel(context.Background()) // Create JetStream context js, err := natsConn.JetStream() if err != nil { cancel() return nil, fmt.Errorf("failed to create JetStream context: %w", err) } // Create or get KV store for leader election kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ Bucket: "aether-leader-election", Description: "Aether cluster leader election coordination", TTL: LeaderLeaseTimeout * 2, // Auto-cleanup expired leases MaxBytes: 1024 * 1024, // 1MB max Replicas: 1, // Single replica for simplicity }) if err != nil { // Try to get existing KV store kv, err = js.KeyValue("aether-leader-election") if err != nil { cancel() return nil, fmt.Errorf("failed to create/get KV store: %w", err) } } return &LeaderElection{ nodeID: nodeID, natsConn: natsConn, js: js, kv: kv, ctx: ctx, cancel: cancel, logger: log.New(os.Stdout, fmt.Sprintf("[Leader %s] ", nodeID), log.LstdFlags), callbacks: callbacks, }, nil } // Start begins the leader election process func (le *LeaderElection) Start() { le.logger.Printf("🗳️ Starting leader election") // Start election loop in background go le.electionLoop() // Start lease renewal loop in background go le.leaseRenewalLoop() // Start leader monitoring go le.monitorLeadership() } // Stop stops the leader election process func (le *LeaderElection) Stop() { le.logger.Printf("🛑 Stopping leader election") le.cancel() // If we're the leader, resign gracefully if le.IsLeader() { le.resignLeadership() } } // IsLeader returns whether this node is currently the leader func (le *LeaderElection) IsLeader() bool { le.mutex.RLock() defer le.mutex.RUnlock() return le.isLeader } // GetLeader returns the current leader ID func (le *LeaderElection) GetLeader() string { le.mutex.RLock() defer le.mutex.RUnlock() return le.currentLeader } // GetTerm returns the current leadership term func (le *LeaderElection) GetTerm() uint64 { le.mutex.RLock() defer le.mutex.RUnlock() return le.leaderTerm } // electionLoop runs the main election process func (le *LeaderElection) electionLoop() { ticker := time.NewTicker(ElectionTimeout) defer ticker.Stop() // Try to become leader immediately le.tryBecomeLeader() for { select { case <-le.ctx.Done(): return case <-ticker.C: // Periodically check if we should try to become leader if !le.IsLeader() && le.shouldTryElection() { le.tryBecomeLeader() } } } } // leaseRenewalLoop renews the leadership lease if we're the leader func (le *LeaderElection) leaseRenewalLoop() { ticker := time.NewTicker(HeartbeatInterval) defer ticker.Stop() for { select { case <-le.ctx.Done(): return case <-ticker.C: if le.IsLeader() { if err := le.renewLease(); err != nil { le.logger.Printf("❌ Failed to renew leadership lease: %v", err) le.loseLeadership() } } } } } // monitorLeadership watches for leadership changes func (le *LeaderElection) monitorLeadership() { watcher, err := le.kv.Watch("leader") if err != nil { le.logger.Printf("❌ Failed to watch leadership: %v", err) return } defer watcher.Stop() for { select { case <-le.ctx.Done(): return case entry := <-watcher.Updates(): if entry == nil { continue } le.handleLeadershipUpdate(entry) } } } // tryBecomeLeader attempts to acquire leadership func (le *LeaderElection) tryBecomeLeader() { le.logger.Printf("🗳️ Attempting to become leader") now := time.Now() newLease := LeadershipLease{ LeaderID: le.nodeID, Term: le.leaderTerm + 1, ExpiresAt: now.Add(LeaderLeaseTimeout), StartedAt: now, } leaseData, err := json.Marshal(newLease) if err != nil { le.logger.Printf("❌ Failed to marshal lease: %v", err) return } // Try to create the leader key (atomic operation) _, err = le.kv.Create("leader", leaseData) if err != nil { // Leader key exists, check if it's expired if le.tryClaimExpiredLease() { return // Successfully claimed expired lease } // Another node is leader return } // Successfully became leader! le.becomeLeader(newLease.Term) } // tryClaimExpiredLease attempts to claim an expired leadership lease func (le *LeaderElection) tryClaimExpiredLease() bool { entry, err := le.kv.Get("leader") if err != nil { return false } var currentLease LeadershipLease if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil { return false } // Check if lease is expired if time.Now().Before(currentLease.ExpiresAt) { // Lease is still valid le.updateCurrentLeader(currentLease.LeaderID, currentLease.Term) return false } // Lease is expired, try to claim it le.logger.Printf("🕐 Attempting to claim expired lease from %s", currentLease.LeaderID) now := time.Now() newLease := LeadershipLease{ LeaderID: le.nodeID, Term: currentLease.Term + 1, ExpiresAt: now.Add(LeaderLeaseTimeout), StartedAt: now, } leaseData, err := json.Marshal(newLease) if err != nil { return false } // Atomically update the lease _, err = le.kv.Update("leader", leaseData, entry.Revision()) if err != nil { return false } // Successfully claimed expired lease! le.becomeLeader(newLease.Term) return true } // renewLease renews the current leadership lease func (le *LeaderElection) renewLease() error { entry, err := le.kv.Get("leader") if err != nil { return err } var currentLease LeadershipLease if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil { return err } // Verify we're still the leader if currentLease.LeaderID != le.nodeID { return fmt.Errorf("no longer leader, current leader is %s", currentLease.LeaderID) } // Renew the lease renewedLease := currentLease renewedLease.ExpiresAt = time.Now().Add(LeaderLeaseTimeout) leaseData, err := json.Marshal(renewedLease) if err != nil { return err } _, err = le.kv.Update("leader", leaseData, entry.Revision()) if err != nil { return fmt.Errorf("failed to renew lease: %w", err) } le.logger.Printf("💓 Renewed leadership lease until %s", renewedLease.ExpiresAt.Format(time.RFC3339)) return nil } // becomeLeader handles becoming the cluster leader func (le *LeaderElection) becomeLeader(term uint64) { le.mutex.Lock() le.isLeader = true le.currentLeader = le.nodeID le.leaderTerm = term le.mutex.Unlock() le.logger.Printf("👑 Became cluster leader (term %d)", term) if le.callbacks.OnBecameLeader != nil { le.callbacks.OnBecameLeader() } } // loseLeadership handles losing leadership func (le *LeaderElection) loseLeadership() { le.mutex.Lock() wasLeader := le.isLeader le.isLeader = false le.mutex.Unlock() if wasLeader { le.logger.Printf("📉 Lost cluster leadership") if le.callbacks.OnLostLeader != nil { le.callbacks.OnLostLeader() } } } // resignLeadership gracefully resigns from leadership func (le *LeaderElection) resignLeadership() { if !le.IsLeader() { return } le.logger.Printf("👋 Resigning from cluster leadership") // Delete the leadership key err := le.kv.Delete("leader") if err != nil { le.logger.Printf("⚠️ Failed to delete leadership key: %v", err) } le.loseLeadership() } // shouldTryElection determines if this node should attempt to become leader func (le *LeaderElection) shouldTryElection() bool { // Always try if no current leader if le.GetLeader() == "" { return true } // Check if current lease is expired entry, err := le.kv.Get("leader") if err != nil { // Can't read lease, try to become leader return true } var currentLease LeadershipLease if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil { // Invalid lease, try to become leader return true } // Try if lease is expired return time.Now().After(currentLease.ExpiresAt) } // handleLeadershipUpdate processes leadership change notifications func (le *LeaderElection) handleLeadershipUpdate(entry nats.KeyValueEntry) { if entry.Operation() == nats.KeyValueDelete { // Leadership was vacated le.updateCurrentLeader("", 0) return } var lease LeadershipLease if err := json.Unmarshal(entry.Value(), &lease); err != nil { le.logger.Printf("⚠️ Invalid leadership lease: %v", err) return } le.updateCurrentLeader(lease.LeaderID, lease.Term) } // updateCurrentLeader updates the current leader information func (le *LeaderElection) updateCurrentLeader(leaderID string, term uint64) { le.mutex.Lock() oldLeader := le.currentLeader le.currentLeader = leaderID le.leaderTerm = term // Update our leadership status if leaderID == le.nodeID { le.isLeader = true } else { if le.isLeader { le.isLeader = false le.mutex.Unlock() if le.callbacks.OnLostLeader != nil { le.callbacks.OnLostLeader() } le.mutex.Lock() } else { le.isLeader = false } } le.mutex.Unlock() // Notify of leader change if oldLeader != leaderID && leaderID != "" && leaderID != le.nodeID { le.logger.Printf("🔄 New cluster leader: %s (term %d)", leaderID, term) if le.callbacks.OnNewLeader != nil { le.callbacks.OnNewLeader(leaderID) } } }