Files
aether/cluster/manager.go
Hugo Nijhuis c757bb76f3
All checks were successful
CI / build (pull_request) Successful in 16s
CI / build (push) Successful in 15s
Make configuration values injectable rather than hardcoded
Add config structs with sensible defaults for tunable parameters:
- JetStreamConfig for stream retention (1 year) and replica count (1)
- HashRingConfig for virtual nodes per physical node (150)
- ShardConfig for shard count (1024) and replication factor (1)

Each component gets a new WithConfig constructor that accepts custom
configuration, while the original constructors continue to work with
defaults. Zero values in configs fall back to defaults for backward
compatibility.

Closes #38

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 15:33:56 +01:00

332 lines
8.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cluster
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/nats-io/nats.go"
)
// VMRegistry provides access to local VM information for cluster operations
type VMRegistry interface {
GetActiveVMs() map[string]interface{} // VirtualMachine interface to avoid import cycles
GetShard(actorID string) int
}
// ClusterManager coordinates distributed VM operations across the cluster
type ClusterManager struct {
nodeID string
nodes map[string]*NodeInfo
nodeUpdates chan NodeUpdate
shardMap *ShardMap
hashRing *ConsistentHashRing
election *LeaderElection
natsConn *nats.Conn
ctx context.Context
mutex sync.RWMutex
logger *log.Logger
vmRegistry VMRegistry // Interface to access local VMs
}
// NewClusterManager creates a cluster coordination manager
func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context) (*ClusterManager, error) {
cm := &ClusterManager{
nodeID: nodeID,
nodes: make(map[string]*NodeInfo),
nodeUpdates: make(chan NodeUpdate, 100),
shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)},
hashRing: NewConsistentHashRing(),
natsConn: natsConn,
ctx: ctx,
logger: log.New(os.Stdout, fmt.Sprintf("[ClusterMgr %s] ", nodeID), log.LstdFlags),
vmRegistry: nil, // Will be set later via SetVMRegistry
}
// Create leadership election with callbacks
callbacks := LeaderElectionCallbacks{
OnBecameLeader: func() {
cm.logger.Printf("👑 This node became the cluster leader - can initiate rebalancing")
},
OnLostLeader: func() {
cm.logger.Printf("📉 This node lost cluster leadership")
},
OnNewLeader: func(leaderID string) {
cm.logger.Printf("🔄 Cluster leadership changed to: %s", leaderID)
},
}
election, err := NewLeaderElection(nodeID, natsConn, callbacks)
if err != nil {
return nil, fmt.Errorf("failed to create leader election: %w", err)
}
cm.election = election
return cm, nil
}
// Start begins cluster management operations
func (cm *ClusterManager) Start() {
cm.logger.Printf("🚀 Starting cluster manager")
// Start leader election
cm.election.Start()
// Subscribe to cluster messages
cm.natsConn.Subscribe("aether.cluster.*", cm.handleClusterMessage)
// Start node monitoring
go cm.monitorNodes()
// Start shard rebalancing (only if leader)
go cm.rebalanceLoop()
}
// Stop gracefully stops the cluster manager
func (cm *ClusterManager) Stop() {
cm.logger.Printf("🛑 Stopping cluster manager")
if cm.election != nil {
cm.election.Stop()
}
}
// IsLeader returns whether this node is the cluster leader
func (cm *ClusterManager) IsLeader() bool {
if cm.election == nil {
return false
}
return cm.election.IsLeader()
}
// GetLeader returns the current cluster leader ID
func (cm *ClusterManager) GetLeader() string {
if cm.election == nil {
return ""
}
return cm.election.GetLeader()
}
// SetVMRegistry sets the VM registry for accessing local VM information
func (cm *ClusterManager) SetVMRegistry(registry VMRegistry) {
cm.vmRegistry = registry
}
// GetActorsInShard returns actors that belong to a specific shard on this node
func (cm *ClusterManager) GetActorsInShard(shardID int) []string {
if cm.vmRegistry == nil {
return []string{}
}
activeVMs := cm.vmRegistry.GetActiveVMs()
var actors []string
for actorID := range activeVMs {
if cm.vmRegistry.GetShard(actorID) == shardID {
actors = append(actors, actorID)
}
}
return actors
}
// handleClusterMessage processes incoming cluster coordination messages
func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
var clusterMsg ClusterMessage
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
cm.logger.Printf("⚠️ Invalid cluster message: %v", err)
return
}
switch clusterMsg.Type {
case "rebalance":
cm.handleRebalanceRequest(clusterMsg)
case "migrate":
cm.handleMigrationRequest(clusterMsg)
case "node_update":
if update, ok := clusterMsg.Payload.(NodeUpdate); ok {
cm.handleNodeUpdate(update)
}
default:
cm.logger.Printf("⚠️ Unknown cluster message type: %s", clusterMsg.Type)
}
}
// handleNodeUpdate processes node status updates
func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
switch update.Type {
case NodeJoined:
cm.nodes[update.Node.ID] = update.Node
cm.hashRing.AddNode(update.Node.ID)
cm.logger.Printf(" Node joined: %s", update.Node.ID)
case NodeLeft:
delete(cm.nodes, update.Node.ID)
cm.hashRing.RemoveNode(update.Node.ID)
cm.logger.Printf(" Node left: %s", update.Node.ID)
case NodeUpdated:
if node, exists := cm.nodes[update.Node.ID]; exists {
// Update existing node info
*node = *update.Node
} else {
// New node
cm.nodes[update.Node.ID] = update.Node
cm.hashRing.AddNode(update.Node.ID)
}
}
// Check for failed nodes and mark them
now := time.Now()
for _, node := range cm.nodes {
if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed {
node.Status = NodeStatusFailed
cm.logger.Printf("❌ Node marked as failed: %s (last seen: %s)",
node.ID, node.LastSeen.Format(time.RFC3339))
}
}
// Trigger rebalancing if we're the leader and there are significant changes
if cm.IsLeader() {
activeNodeCount := 0
for _, node := range cm.nodes {
if node.Status == NodeStatusActive {
activeNodeCount++
}
}
// Simple trigger: rebalance if we have different number of active nodes
// than shards assigned (this is a simplified logic)
if activeNodeCount > 0 {
cm.triggerShardRebalancing("node topology changed")
}
}
}
// handleRebalanceRequest processes cluster rebalancing requests
func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
cm.logger.Printf("🔄 Handling rebalance request from %s", msg.From)
// Implementation would handle the specific rebalancing logic
// This is a simplified version
}
// handleMigrationRequest processes actor migration requests
func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) {
cm.logger.Printf("🚚 Handling migration request from %s", msg.From)
// Implementation would handle the specific migration logic
// This is a simplified version
}
// triggerShardRebalancing initiates shard rebalancing across the cluster
func (cm *ClusterManager) triggerShardRebalancing(reason string) {
if !cm.IsLeader() {
return // Only leader can initiate rebalancing
}
cm.logger.Printf("⚖️ Triggering shard rebalancing: %s", reason)
// Get active nodes
var activeNodes []*NodeInfo
cm.mutex.RLock()
for _, node := range cm.nodes {
if node.Status == NodeStatusActive {
activeNodes = append(activeNodes, node)
}
}
cm.mutex.RUnlock()
if len(activeNodes) == 0 {
cm.logger.Printf("⚠️ No active nodes available for rebalancing")
return
}
// This would implement the actual rebalancing logic
cm.logger.Printf("🎯 Would rebalance across %d active nodes", len(activeNodes))
}
// monitorNodes periodically checks node health and updates
func (cm *ClusterManager) monitorNodes() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Health check logic would go here
cm.checkNodeHealth()
case <-cm.ctx.Done():
return
}
}
}
// checkNodeHealth verifies the health of known nodes
func (cm *ClusterManager) checkNodeHealth() {
cm.mutex.Lock()
defer cm.mutex.Unlock()
now := time.Now()
for _, node := range cm.nodes {
if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive {
node.Status = NodeStatusFailed
cm.logger.Printf("💔 Node failed: %s", node.ID)
}
}
}
// rebalanceLoop runs periodic rebalancing checks (leader only)
func (cm *ClusterManager) rebalanceLoop() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if cm.IsLeader() {
cm.triggerShardRebalancing("periodic rebalance check")
}
case <-cm.ctx.Done():
return
}
}
}
// GetNodes returns a copy of the current cluster nodes
func (cm *ClusterManager) GetNodes() map[string]*NodeInfo {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
nodes := make(map[string]*NodeInfo)
for id, node := range cm.nodes {
// Create a copy to prevent external mutation
nodeCopy := *node
nodes[id] = &nodeCopy
}
return nodes
}
// GetShardMap returns the current shard mapping
func (cm *ClusterManager) GetShardMap() *ShardMap {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
// Return a copy to prevent external mutation
return &ShardMap{
Version: cm.shardMap.Version,
Shards: make(map[int][]string),
Nodes: make(map[string]NodeInfo),
UpdateTime: cm.shardMap.UpdateTime,
}
}