package cluster import ( "context" "encoding/json" "fmt" "log" "os" "sync" "time" "github.com/nats-io/nats.go" ) // VMRegistry provides access to local VM information for cluster operations. // Implementations must provide thread-safe access to VM data. type VMRegistry interface { // GetActiveVMs returns a map of actor IDs to their VirtualMachine instances GetActiveVMs() map[string]VirtualMachine // GetShard returns the shard number for a given actor ID GetShard(actorID string) int } // ClusterManager coordinates distributed VM operations across the cluster type ClusterManager struct { nodeID string nodes map[string]*NodeInfo nodeUpdates chan NodeUpdate shardMap *ShardMap hashRing *ConsistentHashRing election *LeaderElection natsConn *nats.Conn ctx context.Context mutex sync.RWMutex logger *log.Logger vmRegistry VMRegistry // Interface to access local VMs } // NewClusterManager creates a cluster coordination manager func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context) (*ClusterManager, error) { cm := &ClusterManager{ nodeID: nodeID, nodes: make(map[string]*NodeInfo), nodeUpdates: make(chan NodeUpdate, 100), shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)}, hashRing: NewConsistentHashRing(), natsConn: natsConn, ctx: ctx, logger: log.New(os.Stdout, fmt.Sprintf("[ClusterMgr %s] ", nodeID), log.LstdFlags), vmRegistry: nil, // Will be set later via SetVMRegistry } // Create leadership election with callbacks callbacks := LeaderElectionCallbacks{ OnBecameLeader: func() { cm.logger.Printf("This node became the cluster leader - can initiate rebalancing") }, OnLostLeader: func() { cm.logger.Printf("This node lost cluster leadership") }, OnNewLeader: func(leaderID string) { cm.logger.Printf("Cluster leadership changed to: %s", leaderID) }, } election, err := NewLeaderElection(nodeID, natsConn, callbacks) if err != nil { return nil, fmt.Errorf("failed to create leader election: %w", err) } cm.election = election return cm, nil } // Start begins cluster management operations func (cm *ClusterManager) Start() { cm.logger.Printf("Starting cluster manager") // Start leader election cm.election.Start() // Subscribe to cluster messages cm.natsConn.Subscribe("aether.cluster.*", cm.handleClusterMessage) // Start node monitoring go cm.monitorNodes() // Start shard rebalancing (only if leader) go cm.rebalanceLoop() } // Stop gracefully stops the cluster manager func (cm *ClusterManager) Stop() { cm.logger.Printf("Stopping cluster manager") if cm.election != nil { cm.election.Stop() } } // IsLeader returns whether this node is the cluster leader func (cm *ClusterManager) IsLeader() bool { if cm.election == nil { return false } return cm.election.IsLeader() } // GetLeader returns the current cluster leader ID func (cm *ClusterManager) GetLeader() string { if cm.election == nil { return "" } return cm.election.GetLeader() } // SetVMRegistry sets the VM registry for accessing local VM information func (cm *ClusterManager) SetVMRegistry(registry VMRegistry) { cm.vmRegistry = registry } // GetActorsInShard returns actors that belong to a specific shard on this node func (cm *ClusterManager) GetActorsInShard(shardID int) []string { if cm.vmRegistry == nil { return []string{} } activeVMs := cm.vmRegistry.GetActiveVMs() var actors []string for actorID := range activeVMs { if cm.vmRegistry.GetShard(actorID) == shardID { actors = append(actors, actorID) } } return actors } // handleClusterMessage processes incoming cluster coordination messages func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) { var clusterMsg ClusterMessage if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil { cm.logger.Printf("Invalid cluster message: %v", err) return } switch clusterMsg.Type { case "rebalance": cm.handleRebalanceRequest(clusterMsg) case "migrate": cm.handleMigrationRequest(clusterMsg) case "node_update": if update, ok := clusterMsg.Payload.(NodeUpdate); ok { cm.handleNodeUpdate(update) } default: cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type) } } // handleNodeUpdate processes node status updates func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) { cm.mutex.Lock() defer cm.mutex.Unlock() switch update.Type { case NodeJoined: cm.nodes[update.Node.ID] = update.Node cm.hashRing.AddNode(update.Node.ID) cm.logger.Printf("Node joined: %s", update.Node.ID) case NodeLeft: delete(cm.nodes, update.Node.ID) cm.hashRing.RemoveNode(update.Node.ID) cm.logger.Printf("Node left: %s", update.Node.ID) case NodeUpdated: if node, exists := cm.nodes[update.Node.ID]; exists { // Update existing node info *node = *update.Node } else { // New node cm.nodes[update.Node.ID] = update.Node cm.hashRing.AddNode(update.Node.ID) } } // Check for failed nodes and mark them now := time.Now() for _, node := range cm.nodes { if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed { node.Status = NodeStatusFailed cm.logger.Printf("Node marked as failed: %s (last seen: %s)", node.ID, node.LastSeen.Format(time.RFC3339)) } } // Trigger rebalancing if we're the leader and there are significant changes if cm.IsLeader() { activeNodeCount := 0 for _, node := range cm.nodes { if node.Status == NodeStatusActive { activeNodeCount++ } } // Simple trigger: rebalance if we have different number of active nodes // than shards assigned (this is a simplified logic) if activeNodeCount > 0 { cm.triggerShardRebalancing("node topology changed") } } } // handleRebalanceRequest processes cluster rebalancing requests func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) { cm.logger.Printf("Handling rebalance request from %s", msg.From) // Implementation would handle the specific rebalancing logic // This is a simplified version } // handleMigrationRequest processes actor migration requests func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) { cm.logger.Printf("Handling migration request from %s", msg.From) // Implementation would handle the specific migration logic // This is a simplified version } // triggerShardRebalancing initiates shard rebalancing across the cluster func (cm *ClusterManager) triggerShardRebalancing(reason string) { if !cm.IsLeader() { return // Only leader can initiate rebalancing } cm.logger.Printf("Triggering shard rebalancing: %s", reason) // Get active nodes var activeNodes []*NodeInfo cm.mutex.RLock() for _, node := range cm.nodes { if node.Status == NodeStatusActive { activeNodes = append(activeNodes, node) } } cm.mutex.RUnlock() if len(activeNodes) == 0 { cm.logger.Printf("No active nodes available for rebalancing") return } // This would implement the actual rebalancing logic cm.logger.Printf("Would rebalance across %d active nodes", len(activeNodes)) } // monitorNodes periodically checks node health and updates func (cm *ClusterManager) monitorNodes() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: // Health check logic would go here cm.checkNodeHealth() case <-cm.ctx.Done(): return } } } // checkNodeHealth verifies the health of known nodes func (cm *ClusterManager) checkNodeHealth() { cm.mutex.Lock() defer cm.mutex.Unlock() now := time.Now() for _, node := range cm.nodes { if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive { node.Status = NodeStatusFailed cm.logger.Printf("Node failed: %s", node.ID) } } } // rebalanceLoop runs periodic rebalancing checks (leader only) func (cm *ClusterManager) rebalanceLoop() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { select { case <-ticker.C: if cm.IsLeader() { cm.triggerShardRebalancing("periodic rebalance check") } case <-cm.ctx.Done(): return } } } // GetNodes returns a copy of the current cluster nodes func (cm *ClusterManager) GetNodes() map[string]*NodeInfo { cm.mutex.RLock() defer cm.mutex.RUnlock() nodes := make(map[string]*NodeInfo) for id, node := range cm.nodes { // Create a copy to prevent external mutation nodeCopy := *node nodes[id] = &nodeCopy } return nodes } // GetShardMap returns the current shard mapping func (cm *ClusterManager) GetShardMap() *ShardMap { cm.mutex.RLock() defer cm.mutex.RUnlock() // Return a copy to prevent external mutation return &ShardMap{ Version: cm.shardMap.Version, Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo), UpdateTime: cm.shardMap.UpdateTime, } }