Replace interface{} with properly defined interfaces
All checks were successful
CI / build (pull_request) Successful in 16s
All checks were successful
CI / build (pull_request) Successful in 16s
- Add VirtualMachine interface with GetID(), GetActorID(), GetState()
- Add VMState type with idle/running/paused/stopped states
- Add RuntimeModel interface for event storming model contracts
- Add RuntimeMessage interface for actor message contracts
- Add VMProvider interface for decoupled VM access
- Update VMRegistry.GetActiveVMs() to return map[string]VirtualMachine
- Update Runtime interface to use RuntimeModel and RuntimeMessage
- Update DistributedVMRegistry to use VMProvider instead of interface{}
- Add SetVMProvider method to DistributedVM for dependency injection
This improves type safety, makes contracts explicit, and enables better
IDE support while avoiding import cycles through interface segregation.
Closes #37
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -12,9 +12,12 @@ import (
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// VMRegistry provides access to local VM information for cluster operations
|
||||
// VMRegistry provides access to local VM information for cluster operations.
|
||||
// Implementations must provide thread-safe access to VM data.
|
||||
type VMRegistry interface {
|
||||
GetActiveVMs() map[string]interface{} // VirtualMachine interface to avoid import cycles
|
||||
// GetActiveVMs returns a map of actor IDs to their VirtualMachine instances
|
||||
GetActiveVMs() map[string]VirtualMachine
|
||||
// GetShard returns the shard number for a given actor ID
|
||||
GetShard(actorID string) int
|
||||
}
|
||||
|
||||
@@ -50,13 +53,13 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context)
|
||||
// Create leadership election with callbacks
|
||||
callbacks := LeaderElectionCallbacks{
|
||||
OnBecameLeader: func() {
|
||||
cm.logger.Printf("👑 This node became the cluster leader - can initiate rebalancing")
|
||||
cm.logger.Printf("This node became the cluster leader - can initiate rebalancing")
|
||||
},
|
||||
OnLostLeader: func() {
|
||||
cm.logger.Printf("📉 This node lost cluster leadership")
|
||||
cm.logger.Printf("This node lost cluster leadership")
|
||||
},
|
||||
OnNewLeader: func(leaderID string) {
|
||||
cm.logger.Printf("🔄 Cluster leadership changed to: %s", leaderID)
|
||||
cm.logger.Printf("Cluster leadership changed to: %s", leaderID)
|
||||
},
|
||||
}
|
||||
|
||||
@@ -71,7 +74,7 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context)
|
||||
|
||||
// Start begins cluster management operations
|
||||
func (cm *ClusterManager) Start() {
|
||||
cm.logger.Printf("🚀 Starting cluster manager")
|
||||
cm.logger.Printf("Starting cluster manager")
|
||||
|
||||
// Start leader election
|
||||
cm.election.Start()
|
||||
@@ -88,7 +91,7 @@ func (cm *ClusterManager) Start() {
|
||||
|
||||
// Stop gracefully stops the cluster manager
|
||||
func (cm *ClusterManager) Stop() {
|
||||
cm.logger.Printf("🛑 Stopping cluster manager")
|
||||
cm.logger.Printf("Stopping cluster manager")
|
||||
|
||||
if cm.election != nil {
|
||||
cm.election.Stop()
|
||||
@@ -138,7 +141,7 @@ func (cm *ClusterManager) GetActorsInShard(shardID int) []string {
|
||||
func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
|
||||
var clusterMsg ClusterMessage
|
||||
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
|
||||
cm.logger.Printf("⚠️ Invalid cluster message: %v", err)
|
||||
cm.logger.Printf("Invalid cluster message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -152,7 +155,7 @@ func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
|
||||
cm.handleNodeUpdate(update)
|
||||
}
|
||||
default:
|
||||
cm.logger.Printf("⚠️ Unknown cluster message type: %s", clusterMsg.Type)
|
||||
cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,12 +168,12 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
||||
case NodeJoined:
|
||||
cm.nodes[update.Node.ID] = update.Node
|
||||
cm.hashRing.AddNode(update.Node.ID)
|
||||
cm.logger.Printf("➕ Node joined: %s", update.Node.ID)
|
||||
cm.logger.Printf("Node joined: %s", update.Node.ID)
|
||||
|
||||
case NodeLeft:
|
||||
delete(cm.nodes, update.Node.ID)
|
||||
cm.hashRing.RemoveNode(update.Node.ID)
|
||||
cm.logger.Printf("➖ Node left: %s", update.Node.ID)
|
||||
cm.logger.Printf("Node left: %s", update.Node.ID)
|
||||
|
||||
case NodeUpdated:
|
||||
if node, exists := cm.nodes[update.Node.ID]; exists {
|
||||
@@ -188,7 +191,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
||||
for _, node := range cm.nodes {
|
||||
if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed {
|
||||
node.Status = NodeStatusFailed
|
||||
cm.logger.Printf("❌ Node marked as failed: %s (last seen: %s)",
|
||||
cm.logger.Printf("Node marked as failed: %s (last seen: %s)",
|
||||
node.ID, node.LastSeen.Format(time.RFC3339))
|
||||
}
|
||||
}
|
||||
@@ -212,7 +215,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
||||
|
||||
// handleRebalanceRequest processes cluster rebalancing requests
|
||||
func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
|
||||
cm.logger.Printf("🔄 Handling rebalance request from %s", msg.From)
|
||||
cm.logger.Printf("Handling rebalance request from %s", msg.From)
|
||||
|
||||
// Implementation would handle the specific rebalancing logic
|
||||
// This is a simplified version
|
||||
@@ -220,7 +223,7 @@ func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
|
||||
|
||||
// handleMigrationRequest processes actor migration requests
|
||||
func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) {
|
||||
cm.logger.Printf("🚚 Handling migration request from %s", msg.From)
|
||||
cm.logger.Printf("Handling migration request from %s", msg.From)
|
||||
|
||||
// Implementation would handle the specific migration logic
|
||||
// This is a simplified version
|
||||
@@ -232,7 +235,7 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) {
|
||||
return // Only leader can initiate rebalancing
|
||||
}
|
||||
|
||||
cm.logger.Printf("⚖️ Triggering shard rebalancing: %s", reason)
|
||||
cm.logger.Printf("Triggering shard rebalancing: %s", reason)
|
||||
|
||||
// Get active nodes
|
||||
var activeNodes []*NodeInfo
|
||||
@@ -245,12 +248,12 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) {
|
||||
cm.mutex.RUnlock()
|
||||
|
||||
if len(activeNodes) == 0 {
|
||||
cm.logger.Printf("⚠️ No active nodes available for rebalancing")
|
||||
cm.logger.Printf("No active nodes available for rebalancing")
|
||||
return
|
||||
}
|
||||
|
||||
// This would implement the actual rebalancing logic
|
||||
cm.logger.Printf("🎯 Would rebalance across %d active nodes", len(activeNodes))
|
||||
cm.logger.Printf("Would rebalance across %d active nodes", len(activeNodes))
|
||||
}
|
||||
|
||||
// monitorNodes periodically checks node health and updates
|
||||
@@ -279,7 +282,7 @@ func (cm *ClusterManager) checkNodeHealth() {
|
||||
for _, node := range cm.nodes {
|
||||
if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive {
|
||||
node.Status = NodeStatusFailed
|
||||
cm.logger.Printf("💔 Node failed: %s", node.ID)
|
||||
cm.logger.Printf("Node failed: %s", node.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -328,4 +331,4 @@ func (cm *ClusterManager) GetShardMap() *ShardMap {
|
||||
Nodes: make(map[string]NodeInfo),
|
||||
UpdateTime: cm.shardMap.UpdateTime,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user