All checks were successful
CI / build (pull_request) Successful in 16s
JSON unmarshal produces map[string]interface{}, not concrete types.
Added ModelPayload and MessagePayload concrete types that implement
RuntimeModel and RuntimeMessage interfaces respectively.
The handleClusterMessage now re-marshals and unmarshals the payload
to convert from map[string]interface{} to the proper concrete type.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
252 lines
7.4 KiB
Go
252 lines
7.4 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
// DistributedVM manages a cluster of runtime nodes with VM-per-instance architecture
|
|
type DistributedVM struct {
|
|
nodeID string
|
|
cluster *ClusterManager
|
|
localRuntime Runtime
|
|
sharding *ShardManager
|
|
discovery *NodeDiscovery
|
|
natsConn *nats.Conn
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// Runtime defines the interface for a local runtime that executes actors.
|
|
// This interface decouples the cluster package from specific runtime implementations.
|
|
type Runtime interface {
|
|
// Start initializes and starts the runtime
|
|
Start() error
|
|
// LoadModel loads an EventStorming model into the runtime
|
|
LoadModel(model RuntimeModel) error
|
|
// SendMessage sends a message to an actor in the runtime
|
|
SendMessage(message RuntimeMessage) error
|
|
}
|
|
|
|
// DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding.
|
|
// It provides the cluster manager with access to VM information without import cycles.
|
|
type DistributedVMRegistry struct {
|
|
vmProvider VMProvider
|
|
sharding *ShardManager
|
|
}
|
|
|
|
// VMProvider defines an interface for accessing VMs from a runtime.
|
|
// This is used by DistributedVMRegistry to get VM information.
|
|
type VMProvider interface {
|
|
// GetActiveVMs returns a map of actor IDs to their VirtualMachine instances
|
|
GetActiveVMs() map[string]VirtualMachine
|
|
}
|
|
|
|
// NewDistributedVM creates a distributed VM runtime cluster node
|
|
func NewDistributedVM(nodeID string, natsURLs []string, localRuntime Runtime) (*DistributedVM, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
// Connect to NATS cluster
|
|
natsURL := natsURLs[0] // Use first URL for simplicity
|
|
natsConn, err := nats.Connect(natsURL,
|
|
nats.Name(fmt.Sprintf("aether-runtime-%s", nodeID)))
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
|
|
// Create cluster components
|
|
discovery := NewNodeDiscovery(nodeID, natsConn, ctx)
|
|
sharding := NewShardManager(1024, 3) // 1024 shards, 3 replicas
|
|
cluster, err := NewClusterManager(nodeID, natsConn, ctx)
|
|
if err != nil {
|
|
cancel()
|
|
natsConn.Close()
|
|
return nil, fmt.Errorf("failed to create cluster manager: %w", err)
|
|
}
|
|
|
|
dvm := &DistributedVM{
|
|
nodeID: nodeID,
|
|
cluster: cluster,
|
|
localRuntime: localRuntime,
|
|
sharding: sharding,
|
|
discovery: discovery,
|
|
natsConn: natsConn,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
return dvm, nil
|
|
}
|
|
|
|
// SetVMProvider sets the VM provider for the distributed VM registry.
|
|
// This should be called after the runtime is fully initialized.
|
|
func (dvm *DistributedVM) SetVMProvider(provider VMProvider) {
|
|
vmRegistry := &DistributedVMRegistry{
|
|
vmProvider: provider,
|
|
sharding: dvm.sharding,
|
|
}
|
|
dvm.cluster.SetVMRegistry(vmRegistry)
|
|
}
|
|
|
|
// Start begins the distributed VM cluster node
|
|
func (dvm *DistributedVM) Start() error {
|
|
// Start local runtime
|
|
if err := dvm.localRuntime.Start(); err != nil {
|
|
return fmt.Errorf("failed to start local runtime: %w", err)
|
|
}
|
|
|
|
// Start cluster services
|
|
go dvm.discovery.Start()
|
|
go dvm.cluster.Start()
|
|
|
|
// Start message routing
|
|
go dvm.startMessageRouting()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the distributed VM node
|
|
func (dvm *DistributedVM) Stop() {
|
|
dvm.cancel()
|
|
dvm.cluster.Stop()
|
|
dvm.discovery.Stop()
|
|
dvm.natsConn.Close()
|
|
}
|
|
|
|
// LoadModel distributes EventStorming model across the cluster with VM templates
|
|
func (dvm *DistributedVM) LoadModel(model RuntimeModel) error {
|
|
// Load model locally first
|
|
if err := dvm.localRuntime.LoadModel(model); err != nil {
|
|
return fmt.Errorf("failed to load model locally: %w", err)
|
|
}
|
|
|
|
// Broadcast model to other cluster nodes
|
|
msg := ClusterMessage{
|
|
Type: "load_model",
|
|
From: dvm.nodeID,
|
|
To: "broadcast",
|
|
Payload: model,
|
|
}
|
|
|
|
return dvm.publishClusterMessage(msg)
|
|
}
|
|
|
|
// SendMessage routes messages across the distributed cluster
|
|
func (dvm *DistributedVM) SendMessage(message RuntimeMessage) error {
|
|
// This is a simplified implementation
|
|
// In practice, this would determine the target node based on sharding
|
|
// and route the message appropriately
|
|
|
|
return dvm.localRuntime.SendMessage(message)
|
|
}
|
|
|
|
// GetActorNode determines which node should handle a specific actor
|
|
func (dvm *DistributedVM) GetActorNode(actorID string) string {
|
|
// Use consistent hashing to determine the target node
|
|
return dvm.cluster.hashRing.GetNode(actorID)
|
|
}
|
|
|
|
// IsLocalActor checks if an actor should be handled by this node
|
|
func (dvm *DistributedVM) IsLocalActor(actorID string) bool {
|
|
targetNode := dvm.GetActorNode(actorID)
|
|
return targetNode == dvm.nodeID
|
|
}
|
|
|
|
// GetActorsInShard returns actors that belong to a specific shard on this node
|
|
func (dvm *DistributedVM) GetActorsInShard(shardID int) []string {
|
|
return dvm.cluster.GetActorsInShard(shardID)
|
|
}
|
|
|
|
// startMessageRouting begins routing messages between cluster nodes
|
|
func (dvm *DistributedVM) startMessageRouting() {
|
|
// Subscribe to cluster messages
|
|
dvm.natsConn.Subscribe("aether.distributed.*", dvm.handleClusterMessage)
|
|
}
|
|
|
|
// handleClusterMessage processes incoming cluster coordination messages
|
|
func (dvm *DistributedVM) handleClusterMessage(msg *nats.Msg) {
|
|
var clusterMsg ClusterMessage
|
|
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
|
|
return
|
|
}
|
|
|
|
switch clusterMsg.Type {
|
|
case "load_model":
|
|
// Handle model loading from other nodes
|
|
// Re-marshal and unmarshal to convert map[string]interface{} to concrete type
|
|
payloadBytes, err := json.Marshal(clusterMsg.Payload)
|
|
if err != nil {
|
|
return
|
|
}
|
|
var model ModelPayload
|
|
if err := json.Unmarshal(payloadBytes, &model); err != nil {
|
|
return
|
|
}
|
|
dvm.localRuntime.LoadModel(&model)
|
|
|
|
case "route_message":
|
|
// Handle message routing from other nodes
|
|
// Re-marshal and unmarshal to convert map[string]interface{} to concrete type
|
|
payloadBytes, err := json.Marshal(clusterMsg.Payload)
|
|
if err != nil {
|
|
return
|
|
}
|
|
var message MessagePayload
|
|
if err := json.Unmarshal(payloadBytes, &message); err != nil {
|
|
return
|
|
}
|
|
dvm.localRuntime.SendMessage(&message)
|
|
|
|
case "rebalance":
|
|
// Handle shard rebalancing requests
|
|
dvm.handleRebalanceRequest(clusterMsg)
|
|
}
|
|
}
|
|
|
|
// handleRebalanceRequest processes shard rebalancing requests
|
|
func (dvm *DistributedVM) handleRebalanceRequest(msg ClusterMessage) {
|
|
// Simplified rebalancing logic
|
|
// In practice, this would implement complex actor migration
|
|
}
|
|
|
|
// publishClusterMessage sends a message to other cluster nodes
|
|
func (dvm *DistributedVM) publishClusterMessage(msg ClusterMessage) error {
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
subject := fmt.Sprintf("aether.distributed.%s", msg.Type)
|
|
return dvm.natsConn.Publish(subject, data)
|
|
}
|
|
|
|
// GetClusterInfo returns information about the cluster state
|
|
func (dvm *DistributedVM) GetClusterInfo() map[string]interface{} {
|
|
nodes := dvm.cluster.GetNodes()
|
|
|
|
return map[string]interface{}{
|
|
"nodeId": dvm.nodeID,
|
|
"isLeader": dvm.cluster.IsLeader(),
|
|
"leader": dvm.cluster.GetLeader(),
|
|
"nodeCount": len(nodes),
|
|
"nodes": nodes,
|
|
}
|
|
}
|
|
|
|
// GetActiveVMs returns a map of active VMs from the VM provider
|
|
func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]VirtualMachine {
|
|
if dvr.vmProvider == nil {
|
|
return make(map[string]VirtualMachine)
|
|
}
|
|
return dvr.vmProvider.GetActiveVMs()
|
|
}
|
|
|
|
// GetShard returns the shard number for the given actor ID
|
|
func (dvr *DistributedVMRegistry) GetShard(actorID string) int {
|
|
return dvr.sharding.GetShard(actorID)
|
|
}
|