Files
aether/cluster/leader.go
Hugo Nijhuis c757bb76f3
All checks were successful
CI / build (pull_request) Successful in 16s
CI / build (push) Successful in 15s
Make configuration values injectable rather than hardcoded
Add config structs with sensible defaults for tunable parameters:
- JetStreamConfig for stream retention (1 year) and replica count (1)
- HashRingConfig for virtual nodes per physical node (150)
- ShardConfig for shard count (1024) and replication factor (1)

Each component gets a new WithConfig constructor that accepts custom
configuration, while the original constructors continue to work with
defaults. Zero values in configs fall back to defaults for backward
compatibility.

Closes #38

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 15:33:56 +01:00

415 lines
9.8 KiB
Go

package cluster
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/nats-io/nats.go"
)
// LeaderElection manages NATS-based leader election using lease-based coordination
type LeaderElection struct {
nodeID string
natsConn *nats.Conn
js nats.JetStreamContext
kv nats.KeyValue
isLeader bool
currentLeader string
leaderTerm uint64
ctx context.Context
cancel context.CancelFunc
mutex sync.RWMutex
logger *log.Logger
callbacks LeaderElectionCallbacks
}
// NewLeaderElection creates a new NATS-based leader election system
func NewLeaderElection(nodeID string, natsConn *nats.Conn, callbacks LeaderElectionCallbacks) (*LeaderElection, error) {
ctx, cancel := context.WithCancel(context.Background())
// Create JetStream context
js, err := natsConn.JetStream()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
}
// Create or get KV store for leader election
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "aether-leader-election",
Description: "Aether cluster leader election coordination",
TTL: LeaderLeaseTimeout * 2, // Auto-cleanup expired leases
MaxBytes: 1024 * 1024, // 1MB max
Replicas: 1, // Single replica for simplicity
})
if err != nil {
// Try to get existing KV store
kv, err = js.KeyValue("aether-leader-election")
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create/get KV store: %w", err)
}
}
return &LeaderElection{
nodeID: nodeID,
natsConn: natsConn,
js: js,
kv: kv,
ctx: ctx,
cancel: cancel,
logger: log.New(os.Stdout, fmt.Sprintf("[Leader %s] ", nodeID), log.LstdFlags),
callbacks: callbacks,
}, nil
}
// Start begins the leader election process
func (le *LeaderElection) Start() {
le.logger.Printf("🗳️ Starting leader election")
// Start election loop in background
go le.electionLoop()
// Start lease renewal loop in background
go le.leaseRenewalLoop()
// Start leader monitoring
go le.monitorLeadership()
}
// Stop stops the leader election process
func (le *LeaderElection) Stop() {
le.logger.Printf("🛑 Stopping leader election")
le.cancel()
// If we're the leader, resign gracefully
if le.IsLeader() {
le.resignLeadership()
}
}
// IsLeader returns whether this node is currently the leader
func (le *LeaderElection) IsLeader() bool {
le.mutex.RLock()
defer le.mutex.RUnlock()
return le.isLeader
}
// GetLeader returns the current leader ID
func (le *LeaderElection) GetLeader() string {
le.mutex.RLock()
defer le.mutex.RUnlock()
return le.currentLeader
}
// GetTerm returns the current leadership term
func (le *LeaderElection) GetTerm() uint64 {
le.mutex.RLock()
defer le.mutex.RUnlock()
return le.leaderTerm
}
// electionLoop runs the main election process
func (le *LeaderElection) electionLoop() {
ticker := time.NewTicker(ElectionTimeout)
defer ticker.Stop()
// Try to become leader immediately
le.tryBecomeLeader()
for {
select {
case <-le.ctx.Done():
return
case <-ticker.C:
// Periodically check if we should try to become leader
if !le.IsLeader() && le.shouldTryElection() {
le.tryBecomeLeader()
}
}
}
}
// leaseRenewalLoop renews the leadership lease if we're the leader
func (le *LeaderElection) leaseRenewalLoop() {
ticker := time.NewTicker(HeartbeatInterval)
defer ticker.Stop()
for {
select {
case <-le.ctx.Done():
return
case <-ticker.C:
if le.IsLeader() {
if err := le.renewLease(); err != nil {
le.logger.Printf("❌ Failed to renew leadership lease: %v", err)
le.loseLeadership()
}
}
}
}
}
// monitorLeadership watches for leadership changes
func (le *LeaderElection) monitorLeadership() {
watcher, err := le.kv.Watch("leader")
if err != nil {
le.logger.Printf("❌ Failed to watch leadership: %v", err)
return
}
defer watcher.Stop()
for {
select {
case <-le.ctx.Done():
return
case entry := <-watcher.Updates():
if entry == nil {
continue
}
le.handleLeadershipUpdate(entry)
}
}
}
// tryBecomeLeader attempts to acquire leadership
func (le *LeaderElection) tryBecomeLeader() {
le.logger.Printf("🗳️ Attempting to become leader")
now := time.Now()
newLease := LeadershipLease{
LeaderID: le.nodeID,
Term: le.leaderTerm + 1,
ExpiresAt: now.Add(LeaderLeaseTimeout),
StartedAt: now,
}
leaseData, err := json.Marshal(newLease)
if err != nil {
le.logger.Printf("❌ Failed to marshal lease: %v", err)
return
}
// Try to create the leader key (atomic operation)
_, err = le.kv.Create("leader", leaseData)
if err != nil {
// Leader key exists, check if it's expired
if le.tryClaimExpiredLease() {
return // Successfully claimed expired lease
}
// Another node is leader
return
}
// Successfully became leader!
le.becomeLeader(newLease.Term)
}
// tryClaimExpiredLease attempts to claim an expired leadership lease
func (le *LeaderElection) tryClaimExpiredLease() bool {
entry, err := le.kv.Get("leader")
if err != nil {
return false
}
var currentLease LeadershipLease
if err := json.Unmarshal(entry.Value(), &currentLease); err != nil {
return false
}
// Check if lease is expired
if time.Now().Before(currentLease.ExpiresAt) {
// Lease is still valid
le.updateCurrentLeader(currentLease.LeaderID, currentLease.Term)
return false
}
// Lease is expired, try to claim it
le.logger.Printf("🕐 Attempting to claim expired lease from %s", currentLease.LeaderID)
now := time.Now()
newLease := LeadershipLease{
LeaderID: le.nodeID,
Term: currentLease.Term + 1,
ExpiresAt: now.Add(LeaderLeaseTimeout),
StartedAt: now,
}
leaseData, err := json.Marshal(newLease)
if err != nil {
return false
}
// Atomically update the lease
_, err = le.kv.Update("leader", leaseData, entry.Revision())
if err != nil {
return false
}
// Successfully claimed expired lease!
le.becomeLeader(newLease.Term)
return true
}
// renewLease renews the current leadership lease
func (le *LeaderElection) renewLease() error {
entry, err := le.kv.Get("leader")
if err != nil {
return err
}
var currentLease LeadershipLease
if err := json.Unmarshal(entry.Value(), &currentLease); err != nil {
return err
}
// Verify we're still the leader
if currentLease.LeaderID != le.nodeID {
return fmt.Errorf("no longer leader, current leader is %s", currentLease.LeaderID)
}
// Renew the lease
renewedLease := currentLease
renewedLease.ExpiresAt = time.Now().Add(LeaderLeaseTimeout)
leaseData, err := json.Marshal(renewedLease)
if err != nil {
return err
}
_, err = le.kv.Update("leader", leaseData, entry.Revision())
if err != nil {
return fmt.Errorf("failed to renew lease: %w", err)
}
le.logger.Printf("💓 Renewed leadership lease until %s", renewedLease.ExpiresAt.Format(time.RFC3339))
return nil
}
// becomeLeader handles becoming the cluster leader
func (le *LeaderElection) becomeLeader(term uint64) {
le.mutex.Lock()
le.isLeader = true
le.currentLeader = le.nodeID
le.leaderTerm = term
le.mutex.Unlock()
le.logger.Printf("👑 Became cluster leader (term %d)", term)
if le.callbacks.OnBecameLeader != nil {
le.callbacks.OnBecameLeader()
}
}
// loseLeadership handles losing leadership
func (le *LeaderElection) loseLeadership() {
le.mutex.Lock()
wasLeader := le.isLeader
le.isLeader = false
le.mutex.Unlock()
if wasLeader {
le.logger.Printf("📉 Lost cluster leadership")
if le.callbacks.OnLostLeader != nil {
le.callbacks.OnLostLeader()
}
}
}
// resignLeadership gracefully resigns from leadership
func (le *LeaderElection) resignLeadership() {
if !le.IsLeader() {
return
}
le.logger.Printf("👋 Resigning from cluster leadership")
// Delete the leadership key
err := le.kv.Delete("leader")
if err != nil {
le.logger.Printf("⚠️ Failed to delete leadership key: %v", err)
}
le.loseLeadership()
}
// shouldTryElection determines if this node should attempt to become leader
func (le *LeaderElection) shouldTryElection() bool {
// Always try if no current leader
if le.GetLeader() == "" {
return true
}
// Check if current lease is expired
entry, err := le.kv.Get("leader")
if err != nil {
// Can't read lease, try to become leader
return true
}
var currentLease LeadershipLease
if err := json.Unmarshal(entry.Value(), &currentLease); err != nil {
// Invalid lease, try to become leader
return true
}
// Try if lease is expired
return time.Now().After(currentLease.ExpiresAt)
}
// handleLeadershipUpdate processes leadership change notifications
func (le *LeaderElection) handleLeadershipUpdate(entry nats.KeyValueEntry) {
if entry.Operation() == nats.KeyValueDelete {
// Leadership was vacated
le.updateCurrentLeader("", 0)
return
}
var lease LeadershipLease
if err := json.Unmarshal(entry.Value(), &lease); err != nil {
le.logger.Printf("⚠️ Invalid leadership lease: %v", err)
return
}
le.updateCurrentLeader(lease.LeaderID, lease.Term)
}
// updateCurrentLeader updates the current leader information
func (le *LeaderElection) updateCurrentLeader(leaderID string, term uint64) {
le.mutex.Lock()
oldLeader := le.currentLeader
le.currentLeader = leaderID
le.leaderTerm = term
// Update our leadership status
if leaderID == le.nodeID {
le.isLeader = true
} else {
if le.isLeader {
le.isLeader = false
le.mutex.Unlock()
if le.callbacks.OnLostLeader != nil {
le.callbacks.OnLostLeader()
}
le.mutex.Lock()
} else {
le.isLeader = false
}
}
le.mutex.Unlock()
// Notify of leader change
if oldLeader != leaderID && leaderID != "" && leaderID != le.nodeID {
le.logger.Printf("🔄 New cluster leader: %s (term %d)", leaderID, term)
if le.callbacks.OnNewLeader != nil {
le.callbacks.OnNewLeader(leaderID)
}
}
}