Compare commits

...

3 Commits

Author SHA1 Message Date
b759c7fb97 Fix type assertion bug in handleClusterMessage
All checks were successful
CI / build (pull_request) Successful in 16s
JSON unmarshal produces map[string]interface{}, not concrete types.
Added ModelPayload and MessagePayload concrete types that implement
RuntimeModel and RuntimeMessage interfaces respectively.

The handleClusterMessage now re-marshals and unmarshals the payload
to convert from map[string]interface{} to the proper concrete type.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 16:45:28 +01:00
eaff315782 Replace interface{} with properly defined interfaces
- Add VirtualMachine interface with GetID(), GetActorID(), GetState()
- Add VMState type with idle/running/paused/stopped states
- Add RuntimeModel interface for event storming model contracts
- Add RuntimeMessage interface for actor message contracts
- Add VMProvider interface for decoupled VM access
- Update VMRegistry.GetActiveVMs() to return map[string]VirtualMachine
- Update Runtime interface to use RuntimeModel and RuntimeMessage
- Update DistributedVMRegistry to use VMProvider instead of interface{}
- Add SetVMProvider method to DistributedVM for dependency injection

This improves type safety, makes contracts explicit, and enables better
IDE support while avoiding import cycles through interface segregation.

Closes #37

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 16:45:28 +01:00
c757bb76f3 Make configuration values injectable rather than hardcoded
All checks were successful
CI / build (pull_request) Successful in 16s
CI / build (push) Successful in 15s
Add config structs with sensible defaults for tunable parameters:
- JetStreamConfig for stream retention (1 year) and replica count (1)
- HashRingConfig for virtual nodes per physical node (150)
- ShardConfig for shard count (1024) and replication factor (1)

Each component gets a new WithConfig constructor that accepts custom
configuration, while the original constructors continue to work with
defaults. Zero values in configs fall back to defaults for backward
compatibility.

Closes #38

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 15:33:56 +01:00
13 changed files with 494 additions and 116 deletions

View File

@@ -44,5 +44,4 @@
// - Leader election ensures coordination continues despite node failures // - Leader election ensures coordination continues despite node failures
// - Actor migration allows rebalancing when cluster topology changes // - Actor migration allows rebalancing when cluster topology changes
// - Graceful shutdown with proper resource cleanup // - Graceful shutdown with proper resource cleanup
//
package cluster package cluster

125
cluster/config_test.go Normal file
View File

@@ -0,0 +1,125 @@
package cluster
import (
"testing"
)
func TestDefaultHashRingConfig(t *testing.T) {
config := DefaultHashRingConfig()
if config.VirtualNodes != DefaultVirtualNodes {
t.Errorf("expected VirtualNodes=%d, got %d", DefaultVirtualNodes, config.VirtualNodes)
}
}
func TestDefaultShardConfig(t *testing.T) {
config := DefaultShardConfig()
if config.ShardCount != DefaultNumShards {
t.Errorf("expected ShardCount=%d, got %d", DefaultNumShards, config.ShardCount)
}
if config.ReplicationFactor != 1 {
t.Errorf("expected ReplicationFactor=1, got %d", config.ReplicationFactor)
}
}
func TestNewConsistentHashRingWithConfig(t *testing.T) {
t.Run("custom virtual nodes", func(t *testing.T) {
config := HashRingConfig{VirtualNodes: 50}
ring := NewConsistentHashRingWithConfig(config)
ring.AddNode("test-node")
if len(ring.sortedHashes) != 50 {
t.Errorf("expected 50 virtual nodes, got %d", len(ring.sortedHashes))
}
if ring.GetVirtualNodes() != 50 {
t.Errorf("expected GetVirtualNodes()=50, got %d", ring.GetVirtualNodes())
}
})
t.Run("zero value uses default", func(t *testing.T) {
config := HashRingConfig{VirtualNodes: 0}
ring := NewConsistentHashRingWithConfig(config)
ring.AddNode("test-node")
if len(ring.sortedHashes) != DefaultVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", DefaultVirtualNodes, len(ring.sortedHashes))
}
})
t.Run("default constructor uses default config", func(t *testing.T) {
ring := NewConsistentHashRing()
ring.AddNode("test-node")
if len(ring.sortedHashes) != DefaultVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", DefaultVirtualNodes, len(ring.sortedHashes))
}
})
}
func TestNewShardManagerWithConfig(t *testing.T) {
t.Run("custom shard count", func(t *testing.T) {
config := ShardConfig{ShardCount: 256, ReplicationFactor: 2}
sm := NewShardManagerWithConfig(config)
if sm.GetShardCount() != 256 {
t.Errorf("expected shard count 256, got %d", sm.GetShardCount())
}
if sm.GetReplicationFactor() != 2 {
t.Errorf("expected replication factor 2, got %d", sm.GetReplicationFactor())
}
})
t.Run("zero values use defaults", func(t *testing.T) {
config := ShardConfig{ShardCount: 0, ReplicationFactor: 0}
sm := NewShardManagerWithConfig(config)
if sm.GetShardCount() != DefaultNumShards {
t.Errorf("expected shard count %d, got %d", DefaultNumShards, sm.GetShardCount())
}
if sm.GetReplicationFactor() != 1 {
t.Errorf("expected replication factor 1, got %d", sm.GetReplicationFactor())
}
})
t.Run("legacy constructor still works", func(t *testing.T) {
sm := NewShardManager(512, 3)
if sm.GetShardCount() != 512 {
t.Errorf("expected shard count 512, got %d", sm.GetShardCount())
}
if sm.GetReplicationFactor() != 3 {
t.Errorf("expected replication factor 3, got %d", sm.GetReplicationFactor())
}
})
}
func TestShardManagerGetShard_DifferentShardCounts(t *testing.T) {
testCases := []struct {
shardCount int
}{
{shardCount: 16},
{shardCount: 64},
{shardCount: 256},
{shardCount: 1024},
{shardCount: 4096},
}
for _, tc := range testCases {
t.Run("shardCount="+string(rune(tc.shardCount)), func(t *testing.T) {
sm := NewShardManagerWithConfig(ShardConfig{ShardCount: tc.shardCount})
// Verify all actor IDs map to valid shard range
for i := 0; i < 1000; i++ {
actorID := "actor-" + string(rune(i))
shard := sm.GetShard(actorID)
if shard < 0 || shard >= tc.shardCount {
t.Errorf("shard %d out of range [0, %d)", shard, tc.shardCount)
}
}
})
}
}

View File

@@ -12,7 +12,7 @@ import (
type DistributedVM struct { type DistributedVM struct {
nodeID string nodeID string
cluster *ClusterManager cluster *ClusterManager
localRuntime Runtime // Interface to avoid import cycles localRuntime Runtime
sharding *ShardManager sharding *ShardManager
discovery *NodeDiscovery discovery *NodeDiscovery
natsConn *nats.Conn natsConn *nats.Conn
@@ -20,17 +20,29 @@ type DistributedVM struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
// Runtime interface to avoid import cycles with main aether package // Runtime defines the interface for a local runtime that executes actors.
// This interface decouples the cluster package from specific runtime implementations.
type Runtime interface { type Runtime interface {
// Start initializes and starts the runtime
Start() error Start() error
LoadModel(model interface{}) error // LoadModel loads an EventStorming model into the runtime
SendMessage(message interface{}) error LoadModel(model RuntimeModel) error
// SendMessage sends a message to an actor in the runtime
SendMessage(message RuntimeMessage) error
} }
// DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding // DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding.
// It provides the cluster manager with access to VM information without import cycles.
type DistributedVMRegistry struct { type DistributedVMRegistry struct {
runtime interface{} // Runtime interface to avoid import cycles vmProvider VMProvider
sharding *ShardManager sharding *ShardManager
}
// VMProvider defines an interface for accessing VMs from a runtime.
// This is used by DistributedVMRegistry to get VM information.
type VMProvider interface {
// GetActiveVMs returns a map of actor IDs to their VirtualMachine instances
GetActiveVMs() map[string]VirtualMachine
} }
// NewDistributedVM creates a distributed VM runtime cluster node // NewDistributedVM creates a distributed VM runtime cluster node
@@ -67,16 +79,19 @@ func NewDistributedVM(nodeID string, natsURLs []string, localRuntime Runtime) (*
cancel: cancel, cancel: cancel,
} }
// Create VM registry and connect it to cluster manager
vmRegistry := &DistributedVMRegistry{
runtime: localRuntime,
sharding: sharding,
}
cluster.SetVMRegistry(vmRegistry)
return dvm, nil return dvm, nil
} }
// SetVMProvider sets the VM provider for the distributed VM registry.
// This should be called after the runtime is fully initialized.
func (dvm *DistributedVM) SetVMProvider(provider VMProvider) {
vmRegistry := &DistributedVMRegistry{
vmProvider: provider,
sharding: dvm.sharding,
}
dvm.cluster.SetVMRegistry(vmRegistry)
}
// Start begins the distributed VM cluster node // Start begins the distributed VM cluster node
func (dvm *DistributedVM) Start() error { func (dvm *DistributedVM) Start() error {
// Start local runtime // Start local runtime
@@ -103,7 +118,7 @@ func (dvm *DistributedVM) Stop() {
} }
// LoadModel distributes EventStorming model across the cluster with VM templates // LoadModel distributes EventStorming model across the cluster with VM templates
func (dvm *DistributedVM) LoadModel(model interface{}) error { func (dvm *DistributedVM) LoadModel(model RuntimeModel) error {
// Load model locally first // Load model locally first
if err := dvm.localRuntime.LoadModel(model); err != nil { if err := dvm.localRuntime.LoadModel(model); err != nil {
return fmt.Errorf("failed to load model locally: %w", err) return fmt.Errorf("failed to load model locally: %w", err)
@@ -121,7 +136,7 @@ func (dvm *DistributedVM) LoadModel(model interface{}) error {
} }
// SendMessage routes messages across the distributed cluster // SendMessage routes messages across the distributed cluster
func (dvm *DistributedVM) SendMessage(message interface{}) error { func (dvm *DistributedVM) SendMessage(message RuntimeMessage) error {
// This is a simplified implementation // This is a simplified implementation
// In practice, this would determine the target node based on sharding // In practice, this would determine the target node based on sharding
// and route the message appropriately // and route the message appropriately
@@ -162,15 +177,29 @@ func (dvm *DistributedVM) handleClusterMessage(msg *nats.Msg) {
switch clusterMsg.Type { switch clusterMsg.Type {
case "load_model": case "load_model":
// Handle model loading from other nodes // Handle model loading from other nodes
if model := clusterMsg.Payload; model != nil { // Re-marshal and unmarshal to convert map[string]interface{} to concrete type
dvm.localRuntime.LoadModel(model) payloadBytes, err := json.Marshal(clusterMsg.Payload)
if err != nil {
return
} }
var model ModelPayload
if err := json.Unmarshal(payloadBytes, &model); err != nil {
return
}
dvm.localRuntime.LoadModel(&model)
case "route_message": case "route_message":
// Handle message routing from other nodes // Handle message routing from other nodes
if message := clusterMsg.Payload; message != nil { // Re-marshal and unmarshal to convert map[string]interface{} to concrete type
dvm.localRuntime.SendMessage(message) payloadBytes, err := json.Marshal(clusterMsg.Payload)
if err != nil {
return
} }
var message MessagePayload
if err := json.Unmarshal(payloadBytes, &message); err != nil {
return
}
dvm.localRuntime.SendMessage(&message)
case "rebalance": case "rebalance":
// Handle shard rebalancing requests // Handle shard rebalancing requests
@@ -200,19 +229,20 @@ func (dvm *DistributedVM) GetClusterInfo() map[string]interface{} {
nodes := dvm.cluster.GetNodes() nodes := dvm.cluster.GetNodes()
return map[string]interface{}{ return map[string]interface{}{
"nodeId": dvm.nodeID, "nodeId": dvm.nodeID,
"isLeader": dvm.cluster.IsLeader(), "isLeader": dvm.cluster.IsLeader(),
"leader": dvm.cluster.GetLeader(), "leader": dvm.cluster.GetLeader(),
"nodeCount": len(nodes), "nodeCount": len(nodes),
"nodes": nodes, "nodes": nodes,
} }
} }
// GetActiveVMs returns a map of active VMs (implementation depends on runtime) // GetActiveVMs returns a map of active VMs from the VM provider
func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]interface{} { func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]VirtualMachine {
// This would need to access the actual runtime's VM registry if dvr.vmProvider == nil {
// For now, return empty map to avoid import cycles return make(map[string]VirtualMachine)
return make(map[string]interface{}) }
return dvr.vmProvider.GetActiveVMs()
} }
// GetShard returns the shard number for the given actor ID // GetShard returns the shard number for the given actor ID

View File

@@ -12,13 +12,24 @@ type ConsistentHashRing struct {
ring map[uint32]string // hash -> node ID ring map[uint32]string // hash -> node ID
sortedHashes []uint32 // sorted hash keys sortedHashes []uint32 // sorted hash keys
nodes map[string]bool // active nodes nodes map[string]bool // active nodes
virtualNodes int // number of virtual nodes per physical node
} }
// NewConsistentHashRing creates a new consistent hash ring // NewConsistentHashRing creates a new consistent hash ring with default configuration
func NewConsistentHashRing() *ConsistentHashRing { func NewConsistentHashRing() *ConsistentHashRing {
return NewConsistentHashRingWithConfig(DefaultHashRingConfig())
}
// NewConsistentHashRingWithConfig creates a new consistent hash ring with custom configuration
func NewConsistentHashRingWithConfig(config HashRingConfig) *ConsistentHashRing {
virtualNodes := config.VirtualNodes
if virtualNodes == 0 {
virtualNodes = DefaultVirtualNodes
}
return &ConsistentHashRing{ return &ConsistentHashRing{
ring: make(map[uint32]string), ring: make(map[uint32]string),
nodes: make(map[string]bool), nodes: make(map[string]bool),
virtualNodes: virtualNodes,
} }
} }
@@ -31,7 +42,7 @@ func (chr *ConsistentHashRing) AddNode(nodeID string) {
chr.nodes[nodeID] = true chr.nodes[nodeID] = true
// Add virtual nodes for better distribution // Add virtual nodes for better distribution
for i := 0; i < VirtualNodes; i++ { for i := 0; i < chr.virtualNodes; i++ {
virtualKey := fmt.Sprintf("%s:%d", nodeID, i) virtualKey := fmt.Sprintf("%s:%d", nodeID, i)
hash := chr.hash(virtualKey) hash := chr.hash(virtualKey)
chr.ring[hash] = nodeID chr.ring[hash] = nodeID
@@ -103,3 +114,8 @@ func (chr *ConsistentHashRing) GetNodes() []string {
func (chr *ConsistentHashRing) IsEmpty() bool { func (chr *ConsistentHashRing) IsEmpty() bool {
return len(chr.nodes) == 0 return len(chr.nodes) == 0
} }
// GetVirtualNodes returns the number of virtual nodes per physical node
func (chr *ConsistentHashRing) GetVirtualNodes() int {
return chr.virtualNodes
}

View File

@@ -42,7 +42,7 @@ func TestAddNode(t *testing.T) {
} }
// Verify virtual nodes were added // Verify virtual nodes were added
expectedVirtualNodes := VirtualNodes expectedVirtualNodes := DefaultVirtualNodes
if len(ring.sortedHashes) != expectedVirtualNodes { if len(ring.sortedHashes) != expectedVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", expectedVirtualNodes, len(ring.sortedHashes)) t.Errorf("expected %d virtual nodes, got %d", expectedVirtualNodes, len(ring.sortedHashes))
} }
@@ -86,7 +86,7 @@ func TestAddNode_MultipleNodes(t *testing.T) {
t.Errorf("expected 3 nodes, got %d", len(nodes)) t.Errorf("expected 3 nodes, got %d", len(nodes))
} }
expectedHashes := VirtualNodes * 3 expectedHashes := DefaultVirtualNodes * 3
if len(ring.sortedHashes) != expectedHashes { if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes)) t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
} }
@@ -118,7 +118,7 @@ func TestRemoveNode(t *testing.T) {
} }
// Verify virtual nodes were removed // Verify virtual nodes were removed
expectedHashes := VirtualNodes expectedHashes := DefaultVirtualNodes
if len(ring.sortedHashes) != expectedHashes { if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes)) t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
} }
@@ -321,7 +321,7 @@ func TestRingBehavior_ManyNodes(t *testing.T) {
} }
// Verify virtual nodes count // Verify virtual nodes count
expectedHashes := numNodes * VirtualNodes expectedHashes := numNodes * DefaultVirtualNodes
if len(ring.sortedHashes) != expectedHashes { if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes)) t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
} }
@@ -355,7 +355,7 @@ func TestRingBehavior_ManyNodes(t *testing.T) {
} }
} }
func TestVirtualNodes_ImproveDistribution(t *testing.T) { func TestDefaultVirtualNodes_ImproveDistribution(t *testing.T) {
// Test that virtual nodes actually improve distribution // Test that virtual nodes actually improve distribution
// by comparing with a theoretical single-hash-per-node scenario // by comparing with a theoretical single-hash-per-node scenario
@@ -386,7 +386,7 @@ func TestVirtualNodes_ImproveDistribution(t *testing.T) {
stdDev := math.Sqrt(sumSquaredDiff / float64(numNodes)) stdDev := math.Sqrt(sumSquaredDiff / float64(numNodes))
coefficientOfVariation := stdDev / expectedPerNode coefficientOfVariation := stdDev / expectedPerNode
// With VirtualNodes=150, we expect good distribution // With DefaultVirtualNodes=150, we expect good distribution
// Coefficient of variation should be low (< 15%) // Coefficient of variation should be low (< 15%)
if coefficientOfVariation > 0.15 { if coefficientOfVariation > 0.15 {
t.Errorf("distribution has high coefficient of variation: %.2f%% (expected < 15%%)", t.Errorf("distribution has high coefficient of variation: %.2f%% (expected < 15%%)",
@@ -394,8 +394,8 @@ func TestVirtualNodes_ImproveDistribution(t *testing.T) {
} }
// Verify that the actual number of virtual nodes matches expected // Verify that the actual number of virtual nodes matches expected
if len(ring.sortedHashes) != numNodes*VirtualNodes { if len(ring.sortedHashes) != numNodes*DefaultVirtualNodes {
t.Errorf("expected %d virtual node hashes, got %d", numNodes*VirtualNodes, len(ring.sortedHashes)) t.Errorf("expected %d virtual node hashes, got %d", numNodes*DefaultVirtualNodes, len(ring.sortedHashes))
} }
} }

View File

@@ -44,8 +44,8 @@ func NewLeaderElection(nodeID string, natsConn *nats.Conn, callbacks LeaderElect
Bucket: "aether-leader-election", Bucket: "aether-leader-election",
Description: "Aether cluster leader election coordination", Description: "Aether cluster leader election coordination",
TTL: LeaderLeaseTimeout * 2, // Auto-cleanup expired leases TTL: LeaderLeaseTimeout * 2, // Auto-cleanup expired leases
MaxBytes: 1024 * 1024, // 1MB max MaxBytes: 1024 * 1024, // 1MB max
Replicas: 1, // Single replica for simplicity Replicas: 1, // Single replica for simplicity
}) })
if err != nil { if err != nil {
// Try to get existing KV store // Try to get existing KV store

View File

@@ -12,25 +12,28 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
// VMRegistry provides access to local VM information for cluster operations // VMRegistry provides access to local VM information for cluster operations.
// Implementations must provide thread-safe access to VM data.
type VMRegistry interface { type VMRegistry interface {
GetActiveVMs() map[string]interface{} // VirtualMachine interface to avoid import cycles // GetActiveVMs returns a map of actor IDs to their VirtualMachine instances
GetActiveVMs() map[string]VirtualMachine
// GetShard returns the shard number for a given actor ID
GetShard(actorID string) int GetShard(actorID string) int
} }
// ClusterManager coordinates distributed VM operations across the cluster // ClusterManager coordinates distributed VM operations across the cluster
type ClusterManager struct { type ClusterManager struct {
nodeID string nodeID string
nodes map[string]*NodeInfo nodes map[string]*NodeInfo
nodeUpdates chan NodeUpdate nodeUpdates chan NodeUpdate
shardMap *ShardMap shardMap *ShardMap
hashRing *ConsistentHashRing hashRing *ConsistentHashRing
election *LeaderElection election *LeaderElection
natsConn *nats.Conn natsConn *nats.Conn
ctx context.Context ctx context.Context
mutex sync.RWMutex mutex sync.RWMutex
logger *log.Logger logger *log.Logger
vmRegistry VMRegistry // Interface to access local VMs vmRegistry VMRegistry // Interface to access local VMs
} }
// NewClusterManager creates a cluster coordination manager // NewClusterManager creates a cluster coordination manager
@@ -50,13 +53,13 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context)
// Create leadership election with callbacks // Create leadership election with callbacks
callbacks := LeaderElectionCallbacks{ callbacks := LeaderElectionCallbacks{
OnBecameLeader: func() { OnBecameLeader: func() {
cm.logger.Printf("👑 This node became the cluster leader - can initiate rebalancing") cm.logger.Printf("This node became the cluster leader - can initiate rebalancing")
}, },
OnLostLeader: func() { OnLostLeader: func() {
cm.logger.Printf("📉 This node lost cluster leadership") cm.logger.Printf("This node lost cluster leadership")
}, },
OnNewLeader: func(leaderID string) { OnNewLeader: func(leaderID string) {
cm.logger.Printf("🔄 Cluster leadership changed to: %s", leaderID) cm.logger.Printf("Cluster leadership changed to: %s", leaderID)
}, },
} }
@@ -71,7 +74,7 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context)
// Start begins cluster management operations // Start begins cluster management operations
func (cm *ClusterManager) Start() { func (cm *ClusterManager) Start() {
cm.logger.Printf("🚀 Starting cluster manager") cm.logger.Printf("Starting cluster manager")
// Start leader election // Start leader election
cm.election.Start() cm.election.Start()
@@ -88,7 +91,7 @@ func (cm *ClusterManager) Start() {
// Stop gracefully stops the cluster manager // Stop gracefully stops the cluster manager
func (cm *ClusterManager) Stop() { func (cm *ClusterManager) Stop() {
cm.logger.Printf("🛑 Stopping cluster manager") cm.logger.Printf("Stopping cluster manager")
if cm.election != nil { if cm.election != nil {
cm.election.Stop() cm.election.Stop()
@@ -138,7 +141,7 @@ func (cm *ClusterManager) GetActorsInShard(shardID int) []string {
func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) { func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
var clusterMsg ClusterMessage var clusterMsg ClusterMessage
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil { if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
cm.logger.Printf("⚠️ Invalid cluster message: %v", err) cm.logger.Printf("Invalid cluster message: %v", err)
return return
} }
@@ -152,7 +155,7 @@ func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
cm.handleNodeUpdate(update) cm.handleNodeUpdate(update)
} }
default: default:
cm.logger.Printf("⚠️ Unknown cluster message type: %s", clusterMsg.Type) cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type)
} }
} }
@@ -165,12 +168,12 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
case NodeJoined: case NodeJoined:
cm.nodes[update.Node.ID] = update.Node cm.nodes[update.Node.ID] = update.Node
cm.hashRing.AddNode(update.Node.ID) cm.hashRing.AddNode(update.Node.ID)
cm.logger.Printf(" Node joined: %s", update.Node.ID) cm.logger.Printf("Node joined: %s", update.Node.ID)
case NodeLeft: case NodeLeft:
delete(cm.nodes, update.Node.ID) delete(cm.nodes, update.Node.ID)
cm.hashRing.RemoveNode(update.Node.ID) cm.hashRing.RemoveNode(update.Node.ID)
cm.logger.Printf(" Node left: %s", update.Node.ID) cm.logger.Printf("Node left: %s", update.Node.ID)
case NodeUpdated: case NodeUpdated:
if node, exists := cm.nodes[update.Node.ID]; exists { if node, exists := cm.nodes[update.Node.ID]; exists {
@@ -188,7 +191,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
for _, node := range cm.nodes { for _, node := range cm.nodes {
if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed { if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed {
node.Status = NodeStatusFailed node.Status = NodeStatusFailed
cm.logger.Printf("Node marked as failed: %s (last seen: %s)", cm.logger.Printf("Node marked as failed: %s (last seen: %s)",
node.ID, node.LastSeen.Format(time.RFC3339)) node.ID, node.LastSeen.Format(time.RFC3339))
} }
} }
@@ -212,7 +215,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
// handleRebalanceRequest processes cluster rebalancing requests // handleRebalanceRequest processes cluster rebalancing requests
func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) { func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
cm.logger.Printf("🔄 Handling rebalance request from %s", msg.From) cm.logger.Printf("Handling rebalance request from %s", msg.From)
// Implementation would handle the specific rebalancing logic // Implementation would handle the specific rebalancing logic
// This is a simplified version // This is a simplified version
@@ -220,7 +223,7 @@ func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
// handleMigrationRequest processes actor migration requests // handleMigrationRequest processes actor migration requests
func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) { func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) {
cm.logger.Printf("🚚 Handling migration request from %s", msg.From) cm.logger.Printf("Handling migration request from %s", msg.From)
// Implementation would handle the specific migration logic // Implementation would handle the specific migration logic
// This is a simplified version // This is a simplified version
@@ -232,7 +235,7 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) {
return // Only leader can initiate rebalancing return // Only leader can initiate rebalancing
} }
cm.logger.Printf("⚖️ Triggering shard rebalancing: %s", reason) cm.logger.Printf("Triggering shard rebalancing: %s", reason)
// Get active nodes // Get active nodes
var activeNodes []*NodeInfo var activeNodes []*NodeInfo
@@ -245,12 +248,12 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) {
cm.mutex.RUnlock() cm.mutex.RUnlock()
if len(activeNodes) == 0 { if len(activeNodes) == 0 {
cm.logger.Printf("⚠️ No active nodes available for rebalancing") cm.logger.Printf("No active nodes available for rebalancing")
return return
} }
// This would implement the actual rebalancing logic // This would implement the actual rebalancing logic
cm.logger.Printf("🎯 Would rebalance across %d active nodes", len(activeNodes)) cm.logger.Printf("Would rebalance across %d active nodes", len(activeNodes))
} }
// monitorNodes periodically checks node health and updates // monitorNodes periodically checks node health and updates
@@ -279,7 +282,7 @@ func (cm *ClusterManager) checkNodeHealth() {
for _, node := range cm.nodes { for _, node := range cm.nodes {
if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive { if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive {
node.Status = NodeStatusFailed node.Status = NodeStatusFailed
cm.logger.Printf("💔 Node failed: %s", node.ID) cm.logger.Printf("Node failed: %s", node.ID)
} }
} }
} }

View File

@@ -33,8 +33,26 @@ type ShardManager struct {
replication int replication int
} }
// NewShardManager creates a new shard manager // NewShardManager creates a new shard manager with default configuration
func NewShardManager(shardCount, replication int) *ShardManager { func NewShardManager(shardCount, replication int) *ShardManager {
return NewShardManagerWithConfig(ShardConfig{
ShardCount: shardCount,
ReplicationFactor: replication,
})
}
// NewShardManagerWithConfig creates a new shard manager with custom configuration
func NewShardManagerWithConfig(config ShardConfig) *ShardManager {
// Apply defaults for zero values
shardCount := config.ShardCount
if shardCount == 0 {
shardCount = DefaultNumShards
}
replication := config.ReplicationFactor
if replication == 0 {
replication = 1
}
return &ShardManager{ return &ShardManager{
shardCount: shardCount, shardCount: shardCount,
shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)}, shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)},
@@ -149,6 +167,15 @@ func (sm *ShardManager) GetActorsInShard(shardID int, nodeID string, vmRegistry
return actors return actors
} }
// GetShardCount returns the total number of shards
func (sm *ShardManager) GetShardCount() int {
return sm.shardCount
}
// GetReplicationFactor returns the replication factor
func (sm *ShardManager) GetReplicationFactor() int {
return sm.replication
}
// ConsistentHashPlacement implements PlacementStrategy using consistent hashing // ConsistentHashPlacement implements PlacementStrategy using consistent hashing
type ConsistentHashPlacement struct{} type ConsistentHashPlacement struct{}

View File

@@ -4,17 +4,47 @@ import (
"time" "time"
) )
// Default configuration values
const ( const (
// NumShards defines the total number of shards in the cluster // DefaultNumShards defines the default total number of shards in the cluster
NumShards = 1024 DefaultNumShards = 1024
// VirtualNodes defines the number of virtual nodes per physical node for consistent hashing // DefaultVirtualNodes defines the default number of virtual nodes per physical node
VirtualNodes = 150 DefaultVirtualNodes = 150
// Leadership election constants // Leadership election constants
LeaderLeaseTimeout = 10 * time.Second // How long a leader lease lasts LeaderLeaseTimeout = 10 * time.Second // How long a leader lease lasts
HeartbeatInterval = 3 * time.Second // How often leader sends heartbeats HeartbeatInterval = 3 * time.Second // How often leader sends heartbeats
ElectionTimeout = 2 * time.Second // How long to wait for election ElectionTimeout = 2 * time.Second // How long to wait for election
) )
// HashRingConfig holds configuration for the consistent hash ring
type HashRingConfig struct {
// VirtualNodes is the number of virtual nodes per physical node (default: 150)
VirtualNodes int
}
// DefaultHashRingConfig returns the default hash ring configuration
func DefaultHashRingConfig() HashRingConfig {
return HashRingConfig{
VirtualNodes: DefaultVirtualNodes,
}
}
// ShardConfig holds configuration for shard management
type ShardConfig struct {
// ShardCount is the total number of shards (default: 1024)
ShardCount int
// ReplicationFactor is the number of replicas per shard (default: 1)
ReplicationFactor int
}
// DefaultShardConfig returns the default shard configuration
func DefaultShardConfig() ShardConfig {
return ShardConfig{
ShardCount: DefaultNumShards,
ReplicationFactor: 1,
}
}
// NodeStatus represents the health status of a node // NodeStatus represents the health status of a node
type NodeStatus string type NodeStatus string
@@ -30,14 +60,14 @@ type NodeInfo struct {
Address string `json:"address"` Address string `json:"address"`
Port int `json:"port"` Port int `json:"port"`
Status NodeStatus `json:"status"` Status NodeStatus `json:"status"`
Capacity float64 `json:"capacity"` // Maximum load capacity Capacity float64 `json:"capacity"` // Maximum load capacity
Load float64 `json:"load"` // Current CPU/memory load Load float64 `json:"load"` // Current CPU/memory load
LastSeen time.Time `json:"lastSeen"` // Last heartbeat timestamp LastSeen time.Time `json:"lastSeen"` // Last heartbeat timestamp
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
Metadata map[string]string `json:"metadata"` Metadata map[string]string `json:"metadata"`
IsLeader bool `json:"isLeader"` IsLeader bool `json:"isLeader"`
VMCount int `json:"vmCount"` // Number of VMs on this node VMCount int `json:"vmCount"` // Number of VMs on this node
ShardIDs []int `json:"shardIds"` // Shards assigned to this node ShardIDs []int `json:"shardIds"` // Shards assigned to this node
} }
// NodeUpdateType represents the type of node update // NodeUpdateType represents the type of node update
@@ -57,9 +87,9 @@ type NodeUpdate struct {
// ShardMap represents the distribution of shards across cluster nodes // ShardMap represents the distribution of shards across cluster nodes
type ShardMap struct { type ShardMap struct {
Version uint64 `json:"version"` // Incremented on each change Version uint64 `json:"version"` // Incremented on each change
Shards map[int][]string `json:"shards"` // shard ID -> [primary, replica1, replica2] Shards map[int][]string `json:"shards"` // shard ID -> [primary, replica1, replica2]
Nodes map[string]NodeInfo `json:"nodes"` // node ID -> node info Nodes map[string]NodeInfo `json:"nodes"` // node ID -> node info
UpdateTime time.Time `json:"updateTime"` UpdateTime time.Time `json:"updateTime"`
} }
@@ -74,23 +104,23 @@ type ClusterMessage struct {
// RebalanceRequest represents a request to rebalance shards // RebalanceRequest represents a request to rebalance shards
type RebalanceRequest struct { type RebalanceRequest struct {
RequestID string `json:"requestId"` RequestID string `json:"requestId"`
FromNode string `json:"fromNode"` FromNode string `json:"fromNode"`
ToNode string `json:"toNode"` ToNode string `json:"toNode"`
ShardIDs []int `json:"shardIds"` ShardIDs []int `json:"shardIds"`
Reason string `json:"reason"` Reason string `json:"reason"`
Migrations []ActorMigration `json:"migrations"` Migrations []ActorMigration `json:"migrations"`
} }
// ActorMigration represents the migration of an actor between nodes // ActorMigration represents the migration of an actor between nodes
type ActorMigration struct { type ActorMigration struct {
ActorID string `json:"actorId"` ActorID string `json:"actorId"`
FromNode string `json:"fromNode"` FromNode string `json:"fromNode"`
ToNode string `json:"toNode"` ToNode string `json:"toNode"`
ShardID int `json:"shardId"` ShardID int `json:"shardId"`
State map[string]interface{} `json:"state"` State map[string]interface{} `json:"state"`
Version int64 `json:"version"` Version int64 `json:"version"`
Status string `json:"status"` // "pending", "in_progress", "completed", "failed" Status string `json:"status"` // "pending", "in_progress", "completed", "failed"
} }
// LeaderElectionCallbacks defines callbacks for leadership changes // LeaderElectionCallbacks defines callbacks for leadership changes
@@ -108,3 +138,68 @@ type LeadershipLease struct {
StartedAt time.Time `json:"startedAt"` StartedAt time.Time `json:"startedAt"`
} }
// VirtualMachine defines the interface for a virtual machine instance.
// This interface provides the minimal contract needed by the cluster package
// to interact with VMs without creating import cycles with the runtime package.
type VirtualMachine interface {
// GetID returns the unique identifier for this VM
GetID() string
// GetActorID returns the actor ID this VM represents
GetActorID() string
// GetState returns the current state of the VM
GetState() VMState
}
// VMState represents the state of a virtual machine
type VMState string
const (
VMStateIdle VMState = "idle"
VMStateRunning VMState = "running"
VMStatePaused VMState = "paused"
VMStateStopped VMState = "stopped"
)
// RuntimeModel defines the interface for an EventStorming model that can be loaded into a runtime.
// This decouples the cluster package from the specific eventstorming package.
type RuntimeModel interface {
// GetID returns the unique identifier for this model
GetID() string
// GetName returns the name of this model
GetName() string
}
// RuntimeMessage defines the interface for messages that can be sent through the runtime.
// This provides type safety for inter-actor communication without creating import cycles.
type RuntimeMessage interface {
// GetTargetActorID returns the ID of the actor this message is addressed to
GetTargetActorID() string
// GetType returns the message type identifier
GetType() string
}
// ModelPayload is a concrete type for JSON-unmarshaling RuntimeModel payloads.
// Use this when receiving model data over the network.
type ModelPayload struct {
ID string `json:"id"`
Name string `json:"name"`
}
// GetID implements RuntimeModel
func (m *ModelPayload) GetID() string { return m.ID }
// GetName implements RuntimeModel
func (m *ModelPayload) GetName() string { return m.Name }
// MessagePayload is a concrete type for JSON-unmarshaling RuntimeMessage payloads.
// Use this when receiving message data over the network.
type MessagePayload struct {
TargetActorID string `json:"targetActorId"`
Type string `json:"type"`
}
// GetTargetActorID implements RuntimeMessage
func (m *MessagePayload) GetTargetActorID() string { return m.TargetActorID }
// GetType implements RuntimeMessage
func (m *MessagePayload) GetType() string { return m.Type }

View File

@@ -13,11 +13,11 @@ import (
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS // NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS
type NATSEventBus struct { type NATSEventBus struct {
*EventBus // Embed base EventBus for local subscriptions *EventBus // Embed base EventBus for local subscriptions
nc *nats.Conn // NATS connection nc *nats.Conn // NATS connection
subscriptions []*nats.Subscription subscriptions []*nats.Subscription
namespaceSubscribers map[string]int // Track number of subscribers per namespace namespaceSubscribers map[string]int // Track number of subscribers per namespace
nodeID string // Unique ID for this node nodeID string // Unique ID for this node
mutex sync.Mutex mutex sync.Mutex
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc

46
store/config_test.go Normal file
View File

@@ -0,0 +1,46 @@
package store
import (
"testing"
"time"
)
func TestDefaultJetStreamConfig(t *testing.T) {
config := DefaultJetStreamConfig()
if config.StreamRetention != DefaultStreamRetention {
t.Errorf("expected StreamRetention=%v, got %v", DefaultStreamRetention, config.StreamRetention)
}
if config.ReplicaCount != DefaultReplicaCount {
t.Errorf("expected ReplicaCount=%d, got %d", DefaultReplicaCount, config.ReplicaCount)
}
}
func TestJetStreamConfigDefaults(t *testing.T) {
t.Run("default stream retention is 1 year", func(t *testing.T) {
expected := 365 * 24 * time.Hour
if DefaultStreamRetention != expected {
t.Errorf("expected DefaultStreamRetention=%v, got %v", expected, DefaultStreamRetention)
}
})
t.Run("default replica count is 1", func(t *testing.T) {
if DefaultReplicaCount != 1 {
t.Errorf("expected DefaultReplicaCount=1, got %d", DefaultReplicaCount)
}
})
}
func TestJetStreamConfigCustomValues(t *testing.T) {
config := JetStreamConfig{
StreamRetention: 30 * 24 * time.Hour, // 30 days
ReplicaCount: 3,
}
if config.StreamRetention != 30*24*time.Hour {
t.Errorf("expected StreamRetention=30 days, got %v", config.StreamRetention)
}
if config.ReplicaCount != 3 {
t.Errorf("expected ReplicaCount=3, got %d", config.ReplicaCount)
}
}

View File

@@ -11,29 +11,65 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
// Default configuration values for JetStream event store
const (
DefaultStreamRetention = 365 * 24 * time.Hour // 1 year
DefaultReplicaCount = 1
)
// JetStreamConfig holds configuration options for JetStreamEventStore
type JetStreamConfig struct {
// StreamRetention is how long to keep events (default: 1 year)
StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int
}
// DefaultJetStreamConfig returns the default configuration
func DefaultJetStreamConfig() JetStreamConfig {
return JetStreamConfig{
StreamRetention: DefaultStreamRetention,
ReplicaCount: DefaultReplicaCount,
}
}
// JetStreamEventStore implements EventStore using NATS JetStream for persistence // JetStreamEventStore implements EventStore using NATS JetStream for persistence
type JetStreamEventStore struct { type JetStreamEventStore struct {
js nats.JetStreamContext js nats.JetStreamContext
streamName string streamName string
mu sync.Mutex // Protects version checks during SaveEvent config JetStreamConfig
versions map[string]int64 // actorID -> latest version cache mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache
} }
// NewJetStreamEventStore creates a new JetStream-based event store // NewJetStreamEventStore creates a new JetStream-based event store with default configuration
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) { func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
}
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
js, err := natsConn.JetStream() js, err := natsConn.JetStream()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get JetStream context: %w", err) return nil, fmt.Errorf("failed to get JetStream context: %w", err)
} }
// Apply defaults for zero values
if config.StreamRetention == 0 {
config.StreamRetention = DefaultStreamRetention
}
if config.ReplicaCount == 0 {
config.ReplicaCount = DefaultReplicaCount
}
// Create or update the stream // Create or update the stream
stream := &nats.StreamConfig{ stream := &nats.StreamConfig{
Name: streamName, Name: streamName,
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)}, Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
Storage: nats.FileStorage, Storage: nats.FileStorage,
Retention: nats.LimitsPolicy, Retention: nats.LimitsPolicy,
MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year MaxAge: config.StreamRetention,
Replicas: 1, // Can be increased for HA Replicas: config.ReplicaCount,
} }
_, err = js.AddStream(stream) _, err = js.AddStream(stream)
@@ -44,6 +80,7 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
return &JetStreamEventStore{ return &JetStreamEventStore{
js: js, js: js,
streamName: streamName, streamName: streamName,
config: config,
versions: make(map[string]int64), versions: make(map[string]int64),
}, nil }, nil
} }