diff --git a/cluster/distributed.go b/cluster/distributed.go index b38e02c..6e9eb4f 100644 --- a/cluster/distributed.go +++ b/cluster/distributed.go @@ -12,7 +12,7 @@ import ( type DistributedVM struct { nodeID string cluster *ClusterManager - localRuntime Runtime // Interface to avoid import cycles + localRuntime Runtime sharding *ShardManager discovery *NodeDiscovery natsConn *nats.Conn @@ -20,17 +20,29 @@ type DistributedVM struct { cancel context.CancelFunc } -// Runtime interface to avoid import cycles with main aether package +// Runtime defines the interface for a local runtime that executes actors. +// This interface decouples the cluster package from specific runtime implementations. type Runtime interface { + // Start initializes and starts the runtime Start() error - LoadModel(model interface{}) error - SendMessage(message interface{}) error + // LoadModel loads an EventStorming model into the runtime + LoadModel(model RuntimeModel) error + // SendMessage sends a message to an actor in the runtime + SendMessage(message RuntimeMessage) error } -// DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding +// DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding. +// It provides the cluster manager with access to VM information without import cycles. type DistributedVMRegistry struct { - runtime interface{} // Runtime interface to avoid import cycles - sharding *ShardManager + vmProvider VMProvider + sharding *ShardManager +} + +// VMProvider defines an interface for accessing VMs from a runtime. +// This is used by DistributedVMRegistry to get VM information. +type VMProvider interface { + // GetActiveVMs returns a map of actor IDs to their VirtualMachine instances + GetActiveVMs() map[string]VirtualMachine } // NewDistributedVM creates a distributed VM runtime cluster node @@ -67,16 +79,19 @@ func NewDistributedVM(nodeID string, natsURLs []string, localRuntime Runtime) (* cancel: cancel, } - // Create VM registry and connect it to cluster manager - vmRegistry := &DistributedVMRegistry{ - runtime: localRuntime, - sharding: sharding, - } - cluster.SetVMRegistry(vmRegistry) - return dvm, nil } +// SetVMProvider sets the VM provider for the distributed VM registry. +// This should be called after the runtime is fully initialized. +func (dvm *DistributedVM) SetVMProvider(provider VMProvider) { + vmRegistry := &DistributedVMRegistry{ + vmProvider: provider, + sharding: dvm.sharding, + } + dvm.cluster.SetVMRegistry(vmRegistry) +} + // Start begins the distributed VM cluster node func (dvm *DistributedVM) Start() error { // Start local runtime @@ -103,7 +118,7 @@ func (dvm *DistributedVM) Stop() { } // LoadModel distributes EventStorming model across the cluster with VM templates -func (dvm *DistributedVM) LoadModel(model interface{}) error { +func (dvm *DistributedVM) LoadModel(model RuntimeModel) error { // Load model locally first if err := dvm.localRuntime.LoadModel(model); err != nil { return fmt.Errorf("failed to load model locally: %w", err) @@ -121,7 +136,7 @@ func (dvm *DistributedVM) LoadModel(model interface{}) error { } // SendMessage routes messages across the distributed cluster -func (dvm *DistributedVM) SendMessage(message interface{}) error { +func (dvm *DistributedVM) SendMessage(message RuntimeMessage) error { // This is a simplified implementation // In practice, this would determine the target node based on sharding // and route the message appropriately @@ -162,15 +177,29 @@ func (dvm *DistributedVM) handleClusterMessage(msg *nats.Msg) { switch clusterMsg.Type { case "load_model": // Handle model loading from other nodes - if model := clusterMsg.Payload; model != nil { - dvm.localRuntime.LoadModel(model) + // Re-marshal and unmarshal to convert map[string]interface{} to concrete type + payloadBytes, err := json.Marshal(clusterMsg.Payload) + if err != nil { + return } + var model ModelPayload + if err := json.Unmarshal(payloadBytes, &model); err != nil { + return + } + dvm.localRuntime.LoadModel(&model) case "route_message": // Handle message routing from other nodes - if message := clusterMsg.Payload; message != nil { - dvm.localRuntime.SendMessage(message) + // Re-marshal and unmarshal to convert map[string]interface{} to concrete type + payloadBytes, err := json.Marshal(clusterMsg.Payload) + if err != nil { + return } + var message MessagePayload + if err := json.Unmarshal(payloadBytes, &message); err != nil { + return + } + dvm.localRuntime.SendMessage(&message) case "rebalance": // Handle shard rebalancing requests @@ -208,11 +237,12 @@ func (dvm *DistributedVM) GetClusterInfo() map[string]interface{} { } } -// GetActiveVMs returns a map of active VMs (implementation depends on runtime) -func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]interface{} { - // This would need to access the actual runtime's VM registry - // For now, return empty map to avoid import cycles - return make(map[string]interface{}) +// GetActiveVMs returns a map of active VMs from the VM provider +func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]VirtualMachine { + if dvr.vmProvider == nil { + return make(map[string]VirtualMachine) + } + return dvr.vmProvider.GetActiveVMs() } // GetShard returns the shard number for the given actor ID diff --git a/cluster/manager.go b/cluster/manager.go index 821f469..2171408 100644 --- a/cluster/manager.go +++ b/cluster/manager.go @@ -12,9 +12,12 @@ import ( "github.com/nats-io/nats.go" ) -// VMRegistry provides access to local VM information for cluster operations +// VMRegistry provides access to local VM information for cluster operations. +// Implementations must provide thread-safe access to VM data. type VMRegistry interface { - GetActiveVMs() map[string]interface{} // VirtualMachine interface to avoid import cycles + // GetActiveVMs returns a map of actor IDs to their VirtualMachine instances + GetActiveVMs() map[string]VirtualMachine + // GetShard returns the shard number for a given actor ID GetShard(actorID string) int } @@ -50,13 +53,13 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context) // Create leadership election with callbacks callbacks := LeaderElectionCallbacks{ OnBecameLeader: func() { - cm.logger.Printf("👑 This node became the cluster leader - can initiate rebalancing") + cm.logger.Printf("This node became the cluster leader - can initiate rebalancing") }, OnLostLeader: func() { - cm.logger.Printf("📉 This node lost cluster leadership") + cm.logger.Printf("This node lost cluster leadership") }, OnNewLeader: func(leaderID string) { - cm.logger.Printf("🔄 Cluster leadership changed to: %s", leaderID) + cm.logger.Printf("Cluster leadership changed to: %s", leaderID) }, } @@ -71,7 +74,7 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context) // Start begins cluster management operations func (cm *ClusterManager) Start() { - cm.logger.Printf("🚀 Starting cluster manager") + cm.logger.Printf("Starting cluster manager") // Start leader election cm.election.Start() @@ -88,7 +91,7 @@ func (cm *ClusterManager) Start() { // Stop gracefully stops the cluster manager func (cm *ClusterManager) Stop() { - cm.logger.Printf("🛑 Stopping cluster manager") + cm.logger.Printf("Stopping cluster manager") if cm.election != nil { cm.election.Stop() @@ -138,7 +141,7 @@ func (cm *ClusterManager) GetActorsInShard(shardID int) []string { func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) { var clusterMsg ClusterMessage if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil { - cm.logger.Printf("⚠️ Invalid cluster message: %v", err) + cm.logger.Printf("Invalid cluster message: %v", err) return } @@ -152,7 +155,7 @@ func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) { cm.handleNodeUpdate(update) } default: - cm.logger.Printf("⚠️ Unknown cluster message type: %s", clusterMsg.Type) + cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type) } } @@ -165,12 +168,12 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) { case NodeJoined: cm.nodes[update.Node.ID] = update.Node cm.hashRing.AddNode(update.Node.ID) - cm.logger.Printf("➕ Node joined: %s", update.Node.ID) + cm.logger.Printf("Node joined: %s", update.Node.ID) case NodeLeft: delete(cm.nodes, update.Node.ID) cm.hashRing.RemoveNode(update.Node.ID) - cm.logger.Printf("➖ Node left: %s", update.Node.ID) + cm.logger.Printf("Node left: %s", update.Node.ID) case NodeUpdated: if node, exists := cm.nodes[update.Node.ID]; exists { @@ -188,7 +191,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) { for _, node := range cm.nodes { if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed { node.Status = NodeStatusFailed - cm.logger.Printf("❌ Node marked as failed: %s (last seen: %s)", + cm.logger.Printf("Node marked as failed: %s (last seen: %s)", node.ID, node.LastSeen.Format(time.RFC3339)) } } @@ -212,7 +215,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) { // handleRebalanceRequest processes cluster rebalancing requests func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) { - cm.logger.Printf("🔄 Handling rebalance request from %s", msg.From) + cm.logger.Printf("Handling rebalance request from %s", msg.From) // Implementation would handle the specific rebalancing logic // This is a simplified version @@ -220,7 +223,7 @@ func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) { // handleMigrationRequest processes actor migration requests func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) { - cm.logger.Printf("🚚 Handling migration request from %s", msg.From) + cm.logger.Printf("Handling migration request from %s", msg.From) // Implementation would handle the specific migration logic // This is a simplified version @@ -232,7 +235,7 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) { return // Only leader can initiate rebalancing } - cm.logger.Printf("⚖️ Triggering shard rebalancing: %s", reason) + cm.logger.Printf("Triggering shard rebalancing: %s", reason) // Get active nodes var activeNodes []*NodeInfo @@ -245,12 +248,12 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) { cm.mutex.RUnlock() if len(activeNodes) == 0 { - cm.logger.Printf("⚠️ No active nodes available for rebalancing") + cm.logger.Printf("No active nodes available for rebalancing") return } // This would implement the actual rebalancing logic - cm.logger.Printf("🎯 Would rebalance across %d active nodes", len(activeNodes)) + cm.logger.Printf("Would rebalance across %d active nodes", len(activeNodes)) } // monitorNodes periodically checks node health and updates @@ -279,7 +282,7 @@ func (cm *ClusterManager) checkNodeHealth() { for _, node := range cm.nodes { if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive { node.Status = NodeStatusFailed - cm.logger.Printf("💔 Node failed: %s", node.ID) + cm.logger.Printf("Node failed: %s", node.ID) } } } diff --git a/cluster/types.go b/cluster/types.go index f0a13f5..112e555 100644 --- a/cluster/types.go +++ b/cluster/types.go @@ -137,3 +137,69 @@ type LeadershipLease struct { ExpiresAt time.Time `json:"expiresAt"` StartedAt time.Time `json:"startedAt"` } + +// VirtualMachine defines the interface for a virtual machine instance. +// This interface provides the minimal contract needed by the cluster package +// to interact with VMs without creating import cycles with the runtime package. +type VirtualMachine interface { + // GetID returns the unique identifier for this VM + GetID() string + // GetActorID returns the actor ID this VM represents + GetActorID() string + // GetState returns the current state of the VM + GetState() VMState +} + +// VMState represents the state of a virtual machine +type VMState string + +const ( + VMStateIdle VMState = "idle" + VMStateRunning VMState = "running" + VMStatePaused VMState = "paused" + VMStateStopped VMState = "stopped" +) + +// RuntimeModel defines the interface for an EventStorming model that can be loaded into a runtime. +// This decouples the cluster package from the specific eventstorming package. +type RuntimeModel interface { + // GetID returns the unique identifier for this model + GetID() string + // GetName returns the name of this model + GetName() string +} + +// RuntimeMessage defines the interface for messages that can be sent through the runtime. +// This provides type safety for inter-actor communication without creating import cycles. +type RuntimeMessage interface { + // GetTargetActorID returns the ID of the actor this message is addressed to + GetTargetActorID() string + // GetType returns the message type identifier + GetType() string +} + +// ModelPayload is a concrete type for JSON-unmarshaling RuntimeModel payloads. +// Use this when receiving model data over the network. +type ModelPayload struct { + ID string `json:"id"` + Name string `json:"name"` +} + +// GetID implements RuntimeModel +func (m *ModelPayload) GetID() string { return m.ID } + +// GetName implements RuntimeModel +func (m *ModelPayload) GetName() string { return m.Name } + +// MessagePayload is a concrete type for JSON-unmarshaling RuntimeMessage payloads. +// Use this when receiving message data over the network. +type MessagePayload struct { + TargetActorID string `json:"targetActorId"` + Type string `json:"type"` +} + +// GetTargetActorID implements RuntimeMessage +func (m *MessagePayload) GetTargetActorID() string { return m.TargetActorID } + +// GetType implements RuntimeMessage +func (m *MessagePayload) GetType() string { return m.Type }