Files
aether/cluster/manager.go
Hugo Nijhuis 8c02b63dc7
All checks were successful
CI / build (pull_request) Successful in 16s
Replace interface{} with properly defined interfaces
- Add VirtualMachine interface with GetID(), GetActorID(), GetState()
- Add VMState type with idle/running/paused/stopped states
- Add RuntimeModel interface for event storming model contracts
- Add RuntimeMessage interface for actor message contracts
- Add VMProvider interface for decoupled VM access
- Update VMRegistry.GetActiveVMs() to return map[string]VirtualMachine
- Update Runtime interface to use RuntimeModel and RuntimeMessage
- Update DistributedVMRegistry to use VMProvider instead of interface{}
- Add SetVMProvider method to DistributedVM for dependency injection

This improves type safety, makes contracts explicit, and enables better
IDE support while avoiding import cycles through interface segregation.

Closes #37

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 15:32:55 +01:00

335 lines
8.7 KiB
Go

package cluster
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/nats-io/nats.go"
)
// VMRegistry provides access to local VM information for cluster operations.
// Implementations must provide thread-safe access to VM data.
type VMRegistry interface {
// GetActiveVMs returns a map of actor IDs to their VirtualMachine instances
GetActiveVMs() map[string]VirtualMachine
// GetShard returns the shard number for a given actor ID
GetShard(actorID string) int
}
// ClusterManager coordinates distributed VM operations across the cluster
type ClusterManager struct {
nodeID string
nodes map[string]*NodeInfo
nodeUpdates chan NodeUpdate
shardMap *ShardMap
hashRing *ConsistentHashRing
election *LeaderElection
natsConn *nats.Conn
ctx context.Context
mutex sync.RWMutex
logger *log.Logger
vmRegistry VMRegistry // Interface to access local VMs
}
// NewClusterManager creates a cluster coordination manager
func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context) (*ClusterManager, error) {
cm := &ClusterManager{
nodeID: nodeID,
nodes: make(map[string]*NodeInfo),
nodeUpdates: make(chan NodeUpdate, 100),
shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)},
hashRing: NewConsistentHashRing(),
natsConn: natsConn,
ctx: ctx,
logger: log.New(os.Stdout, fmt.Sprintf("[ClusterMgr %s] ", nodeID), log.LstdFlags),
vmRegistry: nil, // Will be set later via SetVMRegistry
}
// Create leadership election with callbacks
callbacks := LeaderElectionCallbacks{
OnBecameLeader: func() {
cm.logger.Printf("This node became the cluster leader - can initiate rebalancing")
},
OnLostLeader: func() {
cm.logger.Printf("This node lost cluster leadership")
},
OnNewLeader: func(leaderID string) {
cm.logger.Printf("Cluster leadership changed to: %s", leaderID)
},
}
election, err := NewLeaderElection(nodeID, natsConn, callbacks)
if err != nil {
return nil, fmt.Errorf("failed to create leader election: %w", err)
}
cm.election = election
return cm, nil
}
// Start begins cluster management operations
func (cm *ClusterManager) Start() {
cm.logger.Printf("Starting cluster manager")
// Start leader election
cm.election.Start()
// Subscribe to cluster messages
cm.natsConn.Subscribe("aether.cluster.*", cm.handleClusterMessage)
// Start node monitoring
go cm.monitorNodes()
// Start shard rebalancing (only if leader)
go cm.rebalanceLoop()
}
// Stop gracefully stops the cluster manager
func (cm *ClusterManager) Stop() {
cm.logger.Printf("Stopping cluster manager")
if cm.election != nil {
cm.election.Stop()
}
}
// IsLeader returns whether this node is the cluster leader
func (cm *ClusterManager) IsLeader() bool {
if cm.election == nil {
return false
}
return cm.election.IsLeader()
}
// GetLeader returns the current cluster leader ID
func (cm *ClusterManager) GetLeader() string {
if cm.election == nil {
return ""
}
return cm.election.GetLeader()
}
// SetVMRegistry sets the VM registry for accessing local VM information
func (cm *ClusterManager) SetVMRegistry(registry VMRegistry) {
cm.vmRegistry = registry
}
// GetActorsInShard returns actors that belong to a specific shard on this node
func (cm *ClusterManager) GetActorsInShard(shardID int) []string {
if cm.vmRegistry == nil {
return []string{}
}
activeVMs := cm.vmRegistry.GetActiveVMs()
var actors []string
for actorID := range activeVMs {
if cm.vmRegistry.GetShard(actorID) == shardID {
actors = append(actors, actorID)
}
}
return actors
}
// handleClusterMessage processes incoming cluster coordination messages
func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
var clusterMsg ClusterMessage
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
cm.logger.Printf("Invalid cluster message: %v", err)
return
}
switch clusterMsg.Type {
case "rebalance":
cm.handleRebalanceRequest(clusterMsg)
case "migrate":
cm.handleMigrationRequest(clusterMsg)
case "node_update":
if update, ok := clusterMsg.Payload.(NodeUpdate); ok {
cm.handleNodeUpdate(update)
}
default:
cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type)
}
}
// handleNodeUpdate processes node status updates
func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
switch update.Type {
case NodeJoined:
cm.nodes[update.Node.ID] = update.Node
cm.hashRing.AddNode(update.Node.ID)
cm.logger.Printf("Node joined: %s", update.Node.ID)
case NodeLeft:
delete(cm.nodes, update.Node.ID)
cm.hashRing.RemoveNode(update.Node.ID)
cm.logger.Printf("Node left: %s", update.Node.ID)
case NodeUpdated:
if node, exists := cm.nodes[update.Node.ID]; exists {
// Update existing node info
*node = *update.Node
} else {
// New node
cm.nodes[update.Node.ID] = update.Node
cm.hashRing.AddNode(update.Node.ID)
}
}
// Check for failed nodes and mark them
now := time.Now()
for _, node := range cm.nodes {
if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed {
node.Status = NodeStatusFailed
cm.logger.Printf("Node marked as failed: %s (last seen: %s)",
node.ID, node.LastSeen.Format(time.RFC3339))
}
}
// Trigger rebalancing if we're the leader and there are significant changes
if cm.IsLeader() {
activeNodeCount := 0
for _, node := range cm.nodes {
if node.Status == NodeStatusActive {
activeNodeCount++
}
}
// Simple trigger: rebalance if we have different number of active nodes
// than shards assigned (this is a simplified logic)
if activeNodeCount > 0 {
cm.triggerShardRebalancing("node topology changed")
}
}
}
// handleRebalanceRequest processes cluster rebalancing requests
func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
cm.logger.Printf("Handling rebalance request from %s", msg.From)
// Implementation would handle the specific rebalancing logic
// This is a simplified version
}
// handleMigrationRequest processes actor migration requests
func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) {
cm.logger.Printf("Handling migration request from %s", msg.From)
// Implementation would handle the specific migration logic
// This is a simplified version
}
// triggerShardRebalancing initiates shard rebalancing across the cluster
func (cm *ClusterManager) triggerShardRebalancing(reason string) {
if !cm.IsLeader() {
return // Only leader can initiate rebalancing
}
cm.logger.Printf("Triggering shard rebalancing: %s", reason)
// Get active nodes
var activeNodes []*NodeInfo
cm.mutex.RLock()
for _, node := range cm.nodes {
if node.Status == NodeStatusActive {
activeNodes = append(activeNodes, node)
}
}
cm.mutex.RUnlock()
if len(activeNodes) == 0 {
cm.logger.Printf("No active nodes available for rebalancing")
return
}
// This would implement the actual rebalancing logic
cm.logger.Printf("Would rebalance across %d active nodes", len(activeNodes))
}
// monitorNodes periodically checks node health and updates
func (cm *ClusterManager) monitorNodes() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Health check logic would go here
cm.checkNodeHealth()
case <-cm.ctx.Done():
return
}
}
}
// checkNodeHealth verifies the health of known nodes
func (cm *ClusterManager) checkNodeHealth() {
cm.mutex.Lock()
defer cm.mutex.Unlock()
now := time.Now()
for _, node := range cm.nodes {
if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive {
node.Status = NodeStatusFailed
cm.logger.Printf("Node failed: %s", node.ID)
}
}
}
// rebalanceLoop runs periodic rebalancing checks (leader only)
func (cm *ClusterManager) rebalanceLoop() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if cm.IsLeader() {
cm.triggerShardRebalancing("periodic rebalance check")
}
case <-cm.ctx.Done():
return
}
}
}
// GetNodes returns a copy of the current cluster nodes
func (cm *ClusterManager) GetNodes() map[string]*NodeInfo {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
nodes := make(map[string]*NodeInfo)
for id, node := range cm.nodes {
// Create a copy to prevent external mutation
nodeCopy := *node
nodes[id] = &nodeCopy
}
return nodes
}
// GetShardMap returns the current shard mapping
func (cm *ClusterManager) GetShardMap() *ShardMap {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
// Return a copy to prevent external mutation
return &ShardMap{
Version: cm.shardMap.Version,
Shards: make(map[int][]string),
Nodes: make(map[string]NodeInfo),
UpdateTime: cm.shardMap.UpdateTime,
}
}