Add config structs with sensible defaults for tunable parameters: - JetStreamConfig for stream retention (1 year) and replica count (1) - HashRingConfig for virtual nodes per physical node (150) - ShardConfig for shard count (1024) and replication factor (1) Each component gets a new WithConfig constructor that accepts custom configuration, while the original constructors continue to work with defaults. Zero values in configs fall back to defaults for backward compatibility. Closes #38 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
415 lines
9.8 KiB
Go
415 lines
9.8 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
// LeaderElection manages NATS-based leader election using lease-based coordination
|
|
type LeaderElection struct {
|
|
nodeID string
|
|
natsConn *nats.Conn
|
|
js nats.JetStreamContext
|
|
kv nats.KeyValue
|
|
isLeader bool
|
|
currentLeader string
|
|
leaderTerm uint64
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
mutex sync.RWMutex
|
|
logger *log.Logger
|
|
callbacks LeaderElectionCallbacks
|
|
}
|
|
|
|
// NewLeaderElection creates a new NATS-based leader election system
|
|
func NewLeaderElection(nodeID string, natsConn *nats.Conn, callbacks LeaderElectionCallbacks) (*LeaderElection, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
// Create JetStream context
|
|
js, err := natsConn.JetStream()
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
|
|
}
|
|
|
|
// Create or get KV store for leader election
|
|
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
|
|
Bucket: "aether-leader-election",
|
|
Description: "Aether cluster leader election coordination",
|
|
TTL: LeaderLeaseTimeout * 2, // Auto-cleanup expired leases
|
|
MaxBytes: 1024 * 1024, // 1MB max
|
|
Replicas: 1, // Single replica for simplicity
|
|
})
|
|
if err != nil {
|
|
// Try to get existing KV store
|
|
kv, err = js.KeyValue("aether-leader-election")
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to create/get KV store: %w", err)
|
|
}
|
|
}
|
|
|
|
return &LeaderElection{
|
|
nodeID: nodeID,
|
|
natsConn: natsConn,
|
|
js: js,
|
|
kv: kv,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: log.New(os.Stdout, fmt.Sprintf("[Leader %s] ", nodeID), log.LstdFlags),
|
|
callbacks: callbacks,
|
|
}, nil
|
|
}
|
|
|
|
// Start begins the leader election process
|
|
func (le *LeaderElection) Start() {
|
|
le.logger.Printf("🗳️ Starting leader election")
|
|
|
|
// Start election loop in background
|
|
go le.electionLoop()
|
|
|
|
// Start lease renewal loop in background
|
|
go le.leaseRenewalLoop()
|
|
|
|
// Start leader monitoring
|
|
go le.monitorLeadership()
|
|
}
|
|
|
|
// Stop stops the leader election process
|
|
func (le *LeaderElection) Stop() {
|
|
le.logger.Printf("🛑 Stopping leader election")
|
|
|
|
le.cancel()
|
|
|
|
// If we're the leader, resign gracefully
|
|
if le.IsLeader() {
|
|
le.resignLeadership()
|
|
}
|
|
}
|
|
|
|
// IsLeader returns whether this node is currently the leader
|
|
func (le *LeaderElection) IsLeader() bool {
|
|
le.mutex.RLock()
|
|
defer le.mutex.RUnlock()
|
|
return le.isLeader
|
|
}
|
|
|
|
// GetLeader returns the current leader ID
|
|
func (le *LeaderElection) GetLeader() string {
|
|
le.mutex.RLock()
|
|
defer le.mutex.RUnlock()
|
|
return le.currentLeader
|
|
}
|
|
|
|
// GetTerm returns the current leadership term
|
|
func (le *LeaderElection) GetTerm() uint64 {
|
|
le.mutex.RLock()
|
|
defer le.mutex.RUnlock()
|
|
return le.leaderTerm
|
|
}
|
|
|
|
// electionLoop runs the main election process
|
|
func (le *LeaderElection) electionLoop() {
|
|
ticker := time.NewTicker(ElectionTimeout)
|
|
defer ticker.Stop()
|
|
|
|
// Try to become leader immediately
|
|
le.tryBecomeLeader()
|
|
|
|
for {
|
|
select {
|
|
case <-le.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// Periodically check if we should try to become leader
|
|
if !le.IsLeader() && le.shouldTryElection() {
|
|
le.tryBecomeLeader()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// leaseRenewalLoop renews the leadership lease if we're the leader
|
|
func (le *LeaderElection) leaseRenewalLoop() {
|
|
ticker := time.NewTicker(HeartbeatInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-le.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if le.IsLeader() {
|
|
if err := le.renewLease(); err != nil {
|
|
le.logger.Printf("❌ Failed to renew leadership lease: %v", err)
|
|
le.loseLeadership()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorLeadership watches for leadership changes
|
|
func (le *LeaderElection) monitorLeadership() {
|
|
watcher, err := le.kv.Watch("leader")
|
|
if err != nil {
|
|
le.logger.Printf("❌ Failed to watch leadership: %v", err)
|
|
return
|
|
}
|
|
defer watcher.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-le.ctx.Done():
|
|
return
|
|
case entry := <-watcher.Updates():
|
|
if entry == nil {
|
|
continue
|
|
}
|
|
le.handleLeadershipUpdate(entry)
|
|
}
|
|
}
|
|
}
|
|
|
|
// tryBecomeLeader attempts to acquire leadership
|
|
func (le *LeaderElection) tryBecomeLeader() {
|
|
le.logger.Printf("🗳️ Attempting to become leader")
|
|
|
|
now := time.Now()
|
|
newLease := LeadershipLease{
|
|
LeaderID: le.nodeID,
|
|
Term: le.leaderTerm + 1,
|
|
ExpiresAt: now.Add(LeaderLeaseTimeout),
|
|
StartedAt: now,
|
|
}
|
|
|
|
leaseData, err := json.Marshal(newLease)
|
|
if err != nil {
|
|
le.logger.Printf("❌ Failed to marshal lease: %v", err)
|
|
return
|
|
}
|
|
|
|
// Try to create the leader key (atomic operation)
|
|
_, err = le.kv.Create("leader", leaseData)
|
|
if err != nil {
|
|
// Leader key exists, check if it's expired
|
|
if le.tryClaimExpiredLease() {
|
|
return // Successfully claimed expired lease
|
|
}
|
|
// Another node is leader
|
|
return
|
|
}
|
|
|
|
// Successfully became leader!
|
|
le.becomeLeader(newLease.Term)
|
|
}
|
|
|
|
// tryClaimExpiredLease attempts to claim an expired leadership lease
|
|
func (le *LeaderElection) tryClaimExpiredLease() bool {
|
|
entry, err := le.kv.Get("leader")
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
var currentLease LeadershipLease
|
|
if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil {
|
|
return false
|
|
}
|
|
|
|
// Check if lease is expired
|
|
if time.Now().Before(currentLease.ExpiresAt) {
|
|
// Lease is still valid
|
|
le.updateCurrentLeader(currentLease.LeaderID, currentLease.Term)
|
|
return false
|
|
}
|
|
|
|
// Lease is expired, try to claim it
|
|
le.logger.Printf("🕐 Attempting to claim expired lease from %s", currentLease.LeaderID)
|
|
|
|
now := time.Now()
|
|
newLease := LeadershipLease{
|
|
LeaderID: le.nodeID,
|
|
Term: currentLease.Term + 1,
|
|
ExpiresAt: now.Add(LeaderLeaseTimeout),
|
|
StartedAt: now,
|
|
}
|
|
|
|
leaseData, err := json.Marshal(newLease)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
// Atomically update the lease
|
|
_, err = le.kv.Update("leader", leaseData, entry.Revision())
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
// Successfully claimed expired lease!
|
|
le.becomeLeader(newLease.Term)
|
|
return true
|
|
}
|
|
|
|
// renewLease renews the current leadership lease
|
|
func (le *LeaderElection) renewLease() error {
|
|
entry, err := le.kv.Get("leader")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var currentLease LeadershipLease
|
|
if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Verify we're still the leader
|
|
if currentLease.LeaderID != le.nodeID {
|
|
return fmt.Errorf("no longer leader, current leader is %s", currentLease.LeaderID)
|
|
}
|
|
|
|
// Renew the lease
|
|
renewedLease := currentLease
|
|
renewedLease.ExpiresAt = time.Now().Add(LeaderLeaseTimeout)
|
|
|
|
leaseData, err := json.Marshal(renewedLease)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = le.kv.Update("leader", leaseData, entry.Revision())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to renew lease: %w", err)
|
|
}
|
|
|
|
le.logger.Printf("💓 Renewed leadership lease until %s", renewedLease.ExpiresAt.Format(time.RFC3339))
|
|
return nil
|
|
}
|
|
|
|
// becomeLeader handles becoming the cluster leader
|
|
func (le *LeaderElection) becomeLeader(term uint64) {
|
|
le.mutex.Lock()
|
|
le.isLeader = true
|
|
le.currentLeader = le.nodeID
|
|
le.leaderTerm = term
|
|
le.mutex.Unlock()
|
|
|
|
le.logger.Printf("👑 Became cluster leader (term %d)", term)
|
|
|
|
if le.callbacks.OnBecameLeader != nil {
|
|
le.callbacks.OnBecameLeader()
|
|
}
|
|
}
|
|
|
|
// loseLeadership handles losing leadership
|
|
func (le *LeaderElection) loseLeadership() {
|
|
le.mutex.Lock()
|
|
wasLeader := le.isLeader
|
|
le.isLeader = false
|
|
le.mutex.Unlock()
|
|
|
|
if wasLeader {
|
|
le.logger.Printf("📉 Lost cluster leadership")
|
|
if le.callbacks.OnLostLeader != nil {
|
|
le.callbacks.OnLostLeader()
|
|
}
|
|
}
|
|
}
|
|
|
|
// resignLeadership gracefully resigns from leadership
|
|
func (le *LeaderElection) resignLeadership() {
|
|
if !le.IsLeader() {
|
|
return
|
|
}
|
|
|
|
le.logger.Printf("👋 Resigning from cluster leadership")
|
|
|
|
// Delete the leadership key
|
|
err := le.kv.Delete("leader")
|
|
if err != nil {
|
|
le.logger.Printf("⚠️ Failed to delete leadership key: %v", err)
|
|
}
|
|
|
|
le.loseLeadership()
|
|
}
|
|
|
|
// shouldTryElection determines if this node should attempt to become leader
|
|
func (le *LeaderElection) shouldTryElection() bool {
|
|
// Always try if no current leader
|
|
if le.GetLeader() == "" {
|
|
return true
|
|
}
|
|
|
|
// Check if current lease is expired
|
|
entry, err := le.kv.Get("leader")
|
|
if err != nil {
|
|
// Can't read lease, try to become leader
|
|
return true
|
|
}
|
|
|
|
var currentLease LeadershipLease
|
|
if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil {
|
|
// Invalid lease, try to become leader
|
|
return true
|
|
}
|
|
|
|
// Try if lease is expired
|
|
return time.Now().After(currentLease.ExpiresAt)
|
|
}
|
|
|
|
// handleLeadershipUpdate processes leadership change notifications
|
|
func (le *LeaderElection) handleLeadershipUpdate(entry nats.KeyValueEntry) {
|
|
if entry.Operation() == nats.KeyValueDelete {
|
|
// Leadership was vacated
|
|
le.updateCurrentLeader("", 0)
|
|
return
|
|
}
|
|
|
|
var lease LeadershipLease
|
|
if err := json.Unmarshal(entry.Value(), &lease); err != nil {
|
|
le.logger.Printf("⚠️ Invalid leadership lease: %v", err)
|
|
return
|
|
}
|
|
|
|
le.updateCurrentLeader(lease.LeaderID, lease.Term)
|
|
}
|
|
|
|
// updateCurrentLeader updates the current leader information
|
|
func (le *LeaderElection) updateCurrentLeader(leaderID string, term uint64) {
|
|
le.mutex.Lock()
|
|
oldLeader := le.currentLeader
|
|
le.currentLeader = leaderID
|
|
le.leaderTerm = term
|
|
|
|
// Update our leadership status
|
|
if leaderID == le.nodeID {
|
|
le.isLeader = true
|
|
} else {
|
|
if le.isLeader {
|
|
le.isLeader = false
|
|
le.mutex.Unlock()
|
|
if le.callbacks.OnLostLeader != nil {
|
|
le.callbacks.OnLostLeader()
|
|
}
|
|
le.mutex.Lock()
|
|
} else {
|
|
le.isLeader = false
|
|
}
|
|
}
|
|
le.mutex.Unlock()
|
|
|
|
// Notify of leader change
|
|
if oldLeader != leaderID && leaderID != "" && leaderID != le.nodeID {
|
|
le.logger.Printf("🔄 New cluster leader: %s (term %d)", leaderID, term)
|
|
if le.callbacks.OnNewLeader != nil {
|
|
le.callbacks.OnNewLeader(leaderID)
|
|
}
|
|
}
|
|
}
|