Compare commits
5 Commits
b630258f60
...
e77a3a9868
| Author | SHA1 | Date | |
|---|---|---|---|
|
e77a3a9868
|
|||
| 8df36cac7a | |||
|
b759c7fb97
|
|||
|
eaff315782
|
|||
|
c757bb76f3
|
@@ -44,5 +44,4 @@
|
|||||||
// - Leader election ensures coordination continues despite node failures
|
// - Leader election ensures coordination continues despite node failures
|
||||||
// - Actor migration allows rebalancing when cluster topology changes
|
// - Actor migration allows rebalancing when cluster topology changes
|
||||||
// - Graceful shutdown with proper resource cleanup
|
// - Graceful shutdown with proper resource cleanup
|
||||||
//
|
|
||||||
package cluster
|
package cluster
|
||||||
125
cluster/config_test.go
Normal file
125
cluster/config_test.go
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDefaultHashRingConfig(t *testing.T) {
|
||||||
|
config := DefaultHashRingConfig()
|
||||||
|
|
||||||
|
if config.VirtualNodes != DefaultVirtualNodes {
|
||||||
|
t.Errorf("expected VirtualNodes=%d, got %d", DefaultVirtualNodes, config.VirtualNodes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDefaultShardConfig(t *testing.T) {
|
||||||
|
config := DefaultShardConfig()
|
||||||
|
|
||||||
|
if config.ShardCount != DefaultNumShards {
|
||||||
|
t.Errorf("expected ShardCount=%d, got %d", DefaultNumShards, config.ShardCount)
|
||||||
|
}
|
||||||
|
if config.ReplicationFactor != 1 {
|
||||||
|
t.Errorf("expected ReplicationFactor=1, got %d", config.ReplicationFactor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewConsistentHashRingWithConfig(t *testing.T) {
|
||||||
|
t.Run("custom virtual nodes", func(t *testing.T) {
|
||||||
|
config := HashRingConfig{VirtualNodes: 50}
|
||||||
|
ring := NewConsistentHashRingWithConfig(config)
|
||||||
|
|
||||||
|
ring.AddNode("test-node")
|
||||||
|
|
||||||
|
if len(ring.sortedHashes) != 50 {
|
||||||
|
t.Errorf("expected 50 virtual nodes, got %d", len(ring.sortedHashes))
|
||||||
|
}
|
||||||
|
if ring.GetVirtualNodes() != 50 {
|
||||||
|
t.Errorf("expected GetVirtualNodes()=50, got %d", ring.GetVirtualNodes())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("zero value uses default", func(t *testing.T) {
|
||||||
|
config := HashRingConfig{VirtualNodes: 0}
|
||||||
|
ring := NewConsistentHashRingWithConfig(config)
|
||||||
|
|
||||||
|
ring.AddNode("test-node")
|
||||||
|
|
||||||
|
if len(ring.sortedHashes) != DefaultVirtualNodes {
|
||||||
|
t.Errorf("expected %d virtual nodes, got %d", DefaultVirtualNodes, len(ring.sortedHashes))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("default constructor uses default config", func(t *testing.T) {
|
||||||
|
ring := NewConsistentHashRing()
|
||||||
|
|
||||||
|
ring.AddNode("test-node")
|
||||||
|
|
||||||
|
if len(ring.sortedHashes) != DefaultVirtualNodes {
|
||||||
|
t.Errorf("expected %d virtual nodes, got %d", DefaultVirtualNodes, len(ring.sortedHashes))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewShardManagerWithConfig(t *testing.T) {
|
||||||
|
t.Run("custom shard count", func(t *testing.T) {
|
||||||
|
config := ShardConfig{ShardCount: 256, ReplicationFactor: 2}
|
||||||
|
sm := NewShardManagerWithConfig(config)
|
||||||
|
|
||||||
|
if sm.GetShardCount() != 256 {
|
||||||
|
t.Errorf("expected shard count 256, got %d", sm.GetShardCount())
|
||||||
|
}
|
||||||
|
if sm.GetReplicationFactor() != 2 {
|
||||||
|
t.Errorf("expected replication factor 2, got %d", sm.GetReplicationFactor())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("zero values use defaults", func(t *testing.T) {
|
||||||
|
config := ShardConfig{ShardCount: 0, ReplicationFactor: 0}
|
||||||
|
sm := NewShardManagerWithConfig(config)
|
||||||
|
|
||||||
|
if sm.GetShardCount() != DefaultNumShards {
|
||||||
|
t.Errorf("expected shard count %d, got %d", DefaultNumShards, sm.GetShardCount())
|
||||||
|
}
|
||||||
|
if sm.GetReplicationFactor() != 1 {
|
||||||
|
t.Errorf("expected replication factor 1, got %d", sm.GetReplicationFactor())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("legacy constructor still works", func(t *testing.T) {
|
||||||
|
sm := NewShardManager(512, 3)
|
||||||
|
|
||||||
|
if sm.GetShardCount() != 512 {
|
||||||
|
t.Errorf("expected shard count 512, got %d", sm.GetShardCount())
|
||||||
|
}
|
||||||
|
if sm.GetReplicationFactor() != 3 {
|
||||||
|
t.Errorf("expected replication factor 3, got %d", sm.GetReplicationFactor())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestShardManagerGetShard_DifferentShardCounts(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
shardCount int
|
||||||
|
}{
|
||||||
|
{shardCount: 16},
|
||||||
|
{shardCount: 64},
|
||||||
|
{shardCount: 256},
|
||||||
|
{shardCount: 1024},
|
||||||
|
{shardCount: 4096},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run("shardCount="+string(rune(tc.shardCount)), func(t *testing.T) {
|
||||||
|
sm := NewShardManagerWithConfig(ShardConfig{ShardCount: tc.shardCount})
|
||||||
|
|
||||||
|
// Verify all actor IDs map to valid shard range
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
actorID := "actor-" + string(rune(i))
|
||||||
|
shard := sm.GetShard(actorID)
|
||||||
|
if shard < 0 || shard >= tc.shardCount {
|
||||||
|
t.Errorf("shard %d out of range [0, %d)", shard, tc.shardCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -12,7 +12,7 @@ import (
|
|||||||
type DistributedVM struct {
|
type DistributedVM struct {
|
||||||
nodeID string
|
nodeID string
|
||||||
cluster *ClusterManager
|
cluster *ClusterManager
|
||||||
localRuntime Runtime // Interface to avoid import cycles
|
localRuntime Runtime
|
||||||
sharding *ShardManager
|
sharding *ShardManager
|
||||||
discovery *NodeDiscovery
|
discovery *NodeDiscovery
|
||||||
natsConn *nats.Conn
|
natsConn *nats.Conn
|
||||||
@@ -20,17 +20,29 @@ type DistributedVM struct {
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runtime interface to avoid import cycles with main aether package
|
// Runtime defines the interface for a local runtime that executes actors.
|
||||||
|
// This interface decouples the cluster package from specific runtime implementations.
|
||||||
type Runtime interface {
|
type Runtime interface {
|
||||||
|
// Start initializes and starts the runtime
|
||||||
Start() error
|
Start() error
|
||||||
LoadModel(model interface{}) error
|
// LoadModel loads an EventStorming model into the runtime
|
||||||
SendMessage(message interface{}) error
|
LoadModel(model RuntimeModel) error
|
||||||
|
// SendMessage sends a message to an actor in the runtime
|
||||||
|
SendMessage(message RuntimeMessage) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding
|
// DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding.
|
||||||
|
// It provides the cluster manager with access to VM information without import cycles.
|
||||||
type DistributedVMRegistry struct {
|
type DistributedVMRegistry struct {
|
||||||
runtime interface{} // Runtime interface to avoid import cycles
|
vmProvider VMProvider
|
||||||
sharding *ShardManager
|
sharding *ShardManager
|
||||||
|
}
|
||||||
|
|
||||||
|
// VMProvider defines an interface for accessing VMs from a runtime.
|
||||||
|
// This is used by DistributedVMRegistry to get VM information.
|
||||||
|
type VMProvider interface {
|
||||||
|
// GetActiveVMs returns a map of actor IDs to their VirtualMachine instances
|
||||||
|
GetActiveVMs() map[string]VirtualMachine
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDistributedVM creates a distributed VM runtime cluster node
|
// NewDistributedVM creates a distributed VM runtime cluster node
|
||||||
@@ -67,16 +79,19 @@ func NewDistributedVM(nodeID string, natsURLs []string, localRuntime Runtime) (*
|
|||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create VM registry and connect it to cluster manager
|
|
||||||
vmRegistry := &DistributedVMRegistry{
|
|
||||||
runtime: localRuntime,
|
|
||||||
sharding: sharding,
|
|
||||||
}
|
|
||||||
cluster.SetVMRegistry(vmRegistry)
|
|
||||||
|
|
||||||
return dvm, nil
|
return dvm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetVMProvider sets the VM provider for the distributed VM registry.
|
||||||
|
// This should be called after the runtime is fully initialized.
|
||||||
|
func (dvm *DistributedVM) SetVMProvider(provider VMProvider) {
|
||||||
|
vmRegistry := &DistributedVMRegistry{
|
||||||
|
vmProvider: provider,
|
||||||
|
sharding: dvm.sharding,
|
||||||
|
}
|
||||||
|
dvm.cluster.SetVMRegistry(vmRegistry)
|
||||||
|
}
|
||||||
|
|
||||||
// Start begins the distributed VM cluster node
|
// Start begins the distributed VM cluster node
|
||||||
func (dvm *DistributedVM) Start() error {
|
func (dvm *DistributedVM) Start() error {
|
||||||
// Start local runtime
|
// Start local runtime
|
||||||
@@ -103,7 +118,7 @@ func (dvm *DistributedVM) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// LoadModel distributes EventStorming model across the cluster with VM templates
|
// LoadModel distributes EventStorming model across the cluster with VM templates
|
||||||
func (dvm *DistributedVM) LoadModel(model interface{}) error {
|
func (dvm *DistributedVM) LoadModel(model RuntimeModel) error {
|
||||||
// Load model locally first
|
// Load model locally first
|
||||||
if err := dvm.localRuntime.LoadModel(model); err != nil {
|
if err := dvm.localRuntime.LoadModel(model); err != nil {
|
||||||
return fmt.Errorf("failed to load model locally: %w", err)
|
return fmt.Errorf("failed to load model locally: %w", err)
|
||||||
@@ -121,7 +136,7 @@ func (dvm *DistributedVM) LoadModel(model interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SendMessage routes messages across the distributed cluster
|
// SendMessage routes messages across the distributed cluster
|
||||||
func (dvm *DistributedVM) SendMessage(message interface{}) error {
|
func (dvm *DistributedVM) SendMessage(message RuntimeMessage) error {
|
||||||
// This is a simplified implementation
|
// This is a simplified implementation
|
||||||
// In practice, this would determine the target node based on sharding
|
// In practice, this would determine the target node based on sharding
|
||||||
// and route the message appropriately
|
// and route the message appropriately
|
||||||
@@ -162,15 +177,29 @@ func (dvm *DistributedVM) handleClusterMessage(msg *nats.Msg) {
|
|||||||
switch clusterMsg.Type {
|
switch clusterMsg.Type {
|
||||||
case "load_model":
|
case "load_model":
|
||||||
// Handle model loading from other nodes
|
// Handle model loading from other nodes
|
||||||
if model := clusterMsg.Payload; model != nil {
|
// Re-marshal and unmarshal to convert map[string]interface{} to concrete type
|
||||||
dvm.localRuntime.LoadModel(model)
|
payloadBytes, err := json.Marshal(clusterMsg.Payload)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
var model ModelPayload
|
||||||
|
if err := json.Unmarshal(payloadBytes, &model); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dvm.localRuntime.LoadModel(&model)
|
||||||
|
|
||||||
case "route_message":
|
case "route_message":
|
||||||
// Handle message routing from other nodes
|
// Handle message routing from other nodes
|
||||||
if message := clusterMsg.Payload; message != nil {
|
// Re-marshal and unmarshal to convert map[string]interface{} to concrete type
|
||||||
dvm.localRuntime.SendMessage(message)
|
payloadBytes, err := json.Marshal(clusterMsg.Payload)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
var message MessagePayload
|
||||||
|
if err := json.Unmarshal(payloadBytes, &message); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dvm.localRuntime.SendMessage(&message)
|
||||||
|
|
||||||
case "rebalance":
|
case "rebalance":
|
||||||
// Handle shard rebalancing requests
|
// Handle shard rebalancing requests
|
||||||
@@ -200,19 +229,20 @@ func (dvm *DistributedVM) GetClusterInfo() map[string]interface{} {
|
|||||||
nodes := dvm.cluster.GetNodes()
|
nodes := dvm.cluster.GetNodes()
|
||||||
|
|
||||||
return map[string]interface{}{
|
return map[string]interface{}{
|
||||||
"nodeId": dvm.nodeID,
|
"nodeId": dvm.nodeID,
|
||||||
"isLeader": dvm.cluster.IsLeader(),
|
"isLeader": dvm.cluster.IsLeader(),
|
||||||
"leader": dvm.cluster.GetLeader(),
|
"leader": dvm.cluster.GetLeader(),
|
||||||
"nodeCount": len(nodes),
|
"nodeCount": len(nodes),
|
||||||
"nodes": nodes,
|
"nodes": nodes,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetActiveVMs returns a map of active VMs (implementation depends on runtime)
|
// GetActiveVMs returns a map of active VMs from the VM provider
|
||||||
func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]interface{} {
|
func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]VirtualMachine {
|
||||||
// This would need to access the actual runtime's VM registry
|
if dvr.vmProvider == nil {
|
||||||
// For now, return empty map to avoid import cycles
|
return make(map[string]VirtualMachine)
|
||||||
return make(map[string]interface{})
|
}
|
||||||
|
return dvr.vmProvider.GetActiveVMs()
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetShard returns the shard number for the given actor ID
|
// GetShard returns the shard number for the given actor ID
|
||||||
|
|||||||
@@ -12,13 +12,24 @@ type ConsistentHashRing struct {
|
|||||||
ring map[uint32]string // hash -> node ID
|
ring map[uint32]string // hash -> node ID
|
||||||
sortedHashes []uint32 // sorted hash keys
|
sortedHashes []uint32 // sorted hash keys
|
||||||
nodes map[string]bool // active nodes
|
nodes map[string]bool // active nodes
|
||||||
|
virtualNodes int // number of virtual nodes per physical node
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConsistentHashRing creates a new consistent hash ring
|
// NewConsistentHashRing creates a new consistent hash ring with default configuration
|
||||||
func NewConsistentHashRing() *ConsistentHashRing {
|
func NewConsistentHashRing() *ConsistentHashRing {
|
||||||
|
return NewConsistentHashRingWithConfig(DefaultHashRingConfig())
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConsistentHashRingWithConfig creates a new consistent hash ring with custom configuration
|
||||||
|
func NewConsistentHashRingWithConfig(config HashRingConfig) *ConsistentHashRing {
|
||||||
|
virtualNodes := config.VirtualNodes
|
||||||
|
if virtualNodes == 0 {
|
||||||
|
virtualNodes = DefaultVirtualNodes
|
||||||
|
}
|
||||||
return &ConsistentHashRing{
|
return &ConsistentHashRing{
|
||||||
ring: make(map[uint32]string),
|
ring: make(map[uint32]string),
|
||||||
nodes: make(map[string]bool),
|
nodes: make(map[string]bool),
|
||||||
|
virtualNodes: virtualNodes,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -31,7 +42,7 @@ func (chr *ConsistentHashRing) AddNode(nodeID string) {
|
|||||||
chr.nodes[nodeID] = true
|
chr.nodes[nodeID] = true
|
||||||
|
|
||||||
// Add virtual nodes for better distribution
|
// Add virtual nodes for better distribution
|
||||||
for i := 0; i < VirtualNodes; i++ {
|
for i := 0; i < chr.virtualNodes; i++ {
|
||||||
virtualKey := fmt.Sprintf("%s:%d", nodeID, i)
|
virtualKey := fmt.Sprintf("%s:%d", nodeID, i)
|
||||||
hash := chr.hash(virtualKey)
|
hash := chr.hash(virtualKey)
|
||||||
chr.ring[hash] = nodeID
|
chr.ring[hash] = nodeID
|
||||||
@@ -103,3 +114,8 @@ func (chr *ConsistentHashRing) GetNodes() []string {
|
|||||||
func (chr *ConsistentHashRing) IsEmpty() bool {
|
func (chr *ConsistentHashRing) IsEmpty() bool {
|
||||||
return len(chr.nodes) == 0
|
return len(chr.nodes) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetVirtualNodes returns the number of virtual nodes per physical node
|
||||||
|
func (chr *ConsistentHashRing) GetVirtualNodes() int {
|
||||||
|
return chr.virtualNodes
|
||||||
|
}
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ func TestAddNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify virtual nodes were added
|
// Verify virtual nodes were added
|
||||||
expectedVirtualNodes := VirtualNodes
|
expectedVirtualNodes := DefaultVirtualNodes
|
||||||
if len(ring.sortedHashes) != expectedVirtualNodes {
|
if len(ring.sortedHashes) != expectedVirtualNodes {
|
||||||
t.Errorf("expected %d virtual nodes, got %d", expectedVirtualNodes, len(ring.sortedHashes))
|
t.Errorf("expected %d virtual nodes, got %d", expectedVirtualNodes, len(ring.sortedHashes))
|
||||||
}
|
}
|
||||||
@@ -86,7 +86,7 @@ func TestAddNode_MultipleNodes(t *testing.T) {
|
|||||||
t.Errorf("expected 3 nodes, got %d", len(nodes))
|
t.Errorf("expected 3 nodes, got %d", len(nodes))
|
||||||
}
|
}
|
||||||
|
|
||||||
expectedHashes := VirtualNodes * 3
|
expectedHashes := DefaultVirtualNodes * 3
|
||||||
if len(ring.sortedHashes) != expectedHashes {
|
if len(ring.sortedHashes) != expectedHashes {
|
||||||
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
|
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
|
||||||
}
|
}
|
||||||
@@ -118,7 +118,7 @@ func TestRemoveNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify virtual nodes were removed
|
// Verify virtual nodes were removed
|
||||||
expectedHashes := VirtualNodes
|
expectedHashes := DefaultVirtualNodes
|
||||||
if len(ring.sortedHashes) != expectedHashes {
|
if len(ring.sortedHashes) != expectedHashes {
|
||||||
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
|
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
|
||||||
}
|
}
|
||||||
@@ -321,7 +321,7 @@ func TestRingBehavior_ManyNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify virtual nodes count
|
// Verify virtual nodes count
|
||||||
expectedHashes := numNodes * VirtualNodes
|
expectedHashes := numNodes * DefaultVirtualNodes
|
||||||
if len(ring.sortedHashes) != expectedHashes {
|
if len(ring.sortedHashes) != expectedHashes {
|
||||||
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
|
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
|
||||||
}
|
}
|
||||||
@@ -355,7 +355,7 @@ func TestRingBehavior_ManyNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVirtualNodes_ImproveDistribution(t *testing.T) {
|
func TestDefaultVirtualNodes_ImproveDistribution(t *testing.T) {
|
||||||
// Test that virtual nodes actually improve distribution
|
// Test that virtual nodes actually improve distribution
|
||||||
// by comparing with a theoretical single-hash-per-node scenario
|
// by comparing with a theoretical single-hash-per-node scenario
|
||||||
|
|
||||||
@@ -386,7 +386,7 @@ func TestVirtualNodes_ImproveDistribution(t *testing.T) {
|
|||||||
stdDev := math.Sqrt(sumSquaredDiff / float64(numNodes))
|
stdDev := math.Sqrt(sumSquaredDiff / float64(numNodes))
|
||||||
coefficientOfVariation := stdDev / expectedPerNode
|
coefficientOfVariation := stdDev / expectedPerNode
|
||||||
|
|
||||||
// With VirtualNodes=150, we expect good distribution
|
// With DefaultVirtualNodes=150, we expect good distribution
|
||||||
// Coefficient of variation should be low (< 15%)
|
// Coefficient of variation should be low (< 15%)
|
||||||
if coefficientOfVariation > 0.15 {
|
if coefficientOfVariation > 0.15 {
|
||||||
t.Errorf("distribution has high coefficient of variation: %.2f%% (expected < 15%%)",
|
t.Errorf("distribution has high coefficient of variation: %.2f%% (expected < 15%%)",
|
||||||
@@ -394,8 +394,8 @@ func TestVirtualNodes_ImproveDistribution(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify that the actual number of virtual nodes matches expected
|
// Verify that the actual number of virtual nodes matches expected
|
||||||
if len(ring.sortedHashes) != numNodes*VirtualNodes {
|
if len(ring.sortedHashes) != numNodes*DefaultVirtualNodes {
|
||||||
t.Errorf("expected %d virtual node hashes, got %d", numNodes*VirtualNodes, len(ring.sortedHashes))
|
t.Errorf("expected %d virtual node hashes, got %d", numNodes*DefaultVirtualNodes, len(ring.sortedHashes))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -44,8 +44,8 @@ func NewLeaderElection(nodeID string, natsConn *nats.Conn, callbacks LeaderElect
|
|||||||
Bucket: "aether-leader-election",
|
Bucket: "aether-leader-election",
|
||||||
Description: "Aether cluster leader election coordination",
|
Description: "Aether cluster leader election coordination",
|
||||||
TTL: LeaderLeaseTimeout * 2, // Auto-cleanup expired leases
|
TTL: LeaderLeaseTimeout * 2, // Auto-cleanup expired leases
|
||||||
MaxBytes: 1024 * 1024, // 1MB max
|
MaxBytes: 1024 * 1024, // 1MB max
|
||||||
Replicas: 1, // Single replica for simplicity
|
Replicas: 1, // Single replica for simplicity
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Try to get existing KV store
|
// Try to get existing KV store
|
||||||
|
|||||||
@@ -12,25 +12,28 @@ import (
|
|||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// VMRegistry provides access to local VM information for cluster operations
|
// VMRegistry provides access to local VM information for cluster operations.
|
||||||
|
// Implementations must provide thread-safe access to VM data.
|
||||||
type VMRegistry interface {
|
type VMRegistry interface {
|
||||||
GetActiveVMs() map[string]interface{} // VirtualMachine interface to avoid import cycles
|
// GetActiveVMs returns a map of actor IDs to their VirtualMachine instances
|
||||||
|
GetActiveVMs() map[string]VirtualMachine
|
||||||
|
// GetShard returns the shard number for a given actor ID
|
||||||
GetShard(actorID string) int
|
GetShard(actorID string) int
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClusterManager coordinates distributed VM operations across the cluster
|
// ClusterManager coordinates distributed VM operations across the cluster
|
||||||
type ClusterManager struct {
|
type ClusterManager struct {
|
||||||
nodeID string
|
nodeID string
|
||||||
nodes map[string]*NodeInfo
|
nodes map[string]*NodeInfo
|
||||||
nodeUpdates chan NodeUpdate
|
nodeUpdates chan NodeUpdate
|
||||||
shardMap *ShardMap
|
shardMap *ShardMap
|
||||||
hashRing *ConsistentHashRing
|
hashRing *ConsistentHashRing
|
||||||
election *LeaderElection
|
election *LeaderElection
|
||||||
natsConn *nats.Conn
|
natsConn *nats.Conn
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
vmRegistry VMRegistry // Interface to access local VMs
|
vmRegistry VMRegistry // Interface to access local VMs
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClusterManager creates a cluster coordination manager
|
// NewClusterManager creates a cluster coordination manager
|
||||||
@@ -50,13 +53,13 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context)
|
|||||||
// Create leadership election with callbacks
|
// Create leadership election with callbacks
|
||||||
callbacks := LeaderElectionCallbacks{
|
callbacks := LeaderElectionCallbacks{
|
||||||
OnBecameLeader: func() {
|
OnBecameLeader: func() {
|
||||||
cm.logger.Printf("👑 This node became the cluster leader - can initiate rebalancing")
|
cm.logger.Printf("This node became the cluster leader - can initiate rebalancing")
|
||||||
},
|
},
|
||||||
OnLostLeader: func() {
|
OnLostLeader: func() {
|
||||||
cm.logger.Printf("📉 This node lost cluster leadership")
|
cm.logger.Printf("This node lost cluster leadership")
|
||||||
},
|
},
|
||||||
OnNewLeader: func(leaderID string) {
|
OnNewLeader: func(leaderID string) {
|
||||||
cm.logger.Printf("🔄 Cluster leadership changed to: %s", leaderID)
|
cm.logger.Printf("Cluster leadership changed to: %s", leaderID)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,7 +74,7 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context)
|
|||||||
|
|
||||||
// Start begins cluster management operations
|
// Start begins cluster management operations
|
||||||
func (cm *ClusterManager) Start() {
|
func (cm *ClusterManager) Start() {
|
||||||
cm.logger.Printf("🚀 Starting cluster manager")
|
cm.logger.Printf("Starting cluster manager")
|
||||||
|
|
||||||
// Start leader election
|
// Start leader election
|
||||||
cm.election.Start()
|
cm.election.Start()
|
||||||
@@ -88,7 +91,7 @@ func (cm *ClusterManager) Start() {
|
|||||||
|
|
||||||
// Stop gracefully stops the cluster manager
|
// Stop gracefully stops the cluster manager
|
||||||
func (cm *ClusterManager) Stop() {
|
func (cm *ClusterManager) Stop() {
|
||||||
cm.logger.Printf("🛑 Stopping cluster manager")
|
cm.logger.Printf("Stopping cluster manager")
|
||||||
|
|
||||||
if cm.election != nil {
|
if cm.election != nil {
|
||||||
cm.election.Stop()
|
cm.election.Stop()
|
||||||
@@ -138,7 +141,7 @@ func (cm *ClusterManager) GetActorsInShard(shardID int) []string {
|
|||||||
func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
|
func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
|
||||||
var clusterMsg ClusterMessage
|
var clusterMsg ClusterMessage
|
||||||
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
|
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
|
||||||
cm.logger.Printf("⚠️ Invalid cluster message: %v", err)
|
cm.logger.Printf("Invalid cluster message: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -152,7 +155,7 @@ func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
|
|||||||
cm.handleNodeUpdate(update)
|
cm.handleNodeUpdate(update)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
cm.logger.Printf("⚠️ Unknown cluster message type: %s", clusterMsg.Type)
|
cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -165,12 +168,12 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
|||||||
case NodeJoined:
|
case NodeJoined:
|
||||||
cm.nodes[update.Node.ID] = update.Node
|
cm.nodes[update.Node.ID] = update.Node
|
||||||
cm.hashRing.AddNode(update.Node.ID)
|
cm.hashRing.AddNode(update.Node.ID)
|
||||||
cm.logger.Printf("➕ Node joined: %s", update.Node.ID)
|
cm.logger.Printf("Node joined: %s", update.Node.ID)
|
||||||
|
|
||||||
case NodeLeft:
|
case NodeLeft:
|
||||||
delete(cm.nodes, update.Node.ID)
|
delete(cm.nodes, update.Node.ID)
|
||||||
cm.hashRing.RemoveNode(update.Node.ID)
|
cm.hashRing.RemoveNode(update.Node.ID)
|
||||||
cm.logger.Printf("➖ Node left: %s", update.Node.ID)
|
cm.logger.Printf("Node left: %s", update.Node.ID)
|
||||||
|
|
||||||
case NodeUpdated:
|
case NodeUpdated:
|
||||||
if node, exists := cm.nodes[update.Node.ID]; exists {
|
if node, exists := cm.nodes[update.Node.ID]; exists {
|
||||||
@@ -188,7 +191,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
|||||||
for _, node := range cm.nodes {
|
for _, node := range cm.nodes {
|
||||||
if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed {
|
if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed {
|
||||||
node.Status = NodeStatusFailed
|
node.Status = NodeStatusFailed
|
||||||
cm.logger.Printf("❌ Node marked as failed: %s (last seen: %s)",
|
cm.logger.Printf("Node marked as failed: %s (last seen: %s)",
|
||||||
node.ID, node.LastSeen.Format(time.RFC3339))
|
node.ID, node.LastSeen.Format(time.RFC3339))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -212,7 +215,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
|||||||
|
|
||||||
// handleRebalanceRequest processes cluster rebalancing requests
|
// handleRebalanceRequest processes cluster rebalancing requests
|
||||||
func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
|
func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
|
||||||
cm.logger.Printf("🔄 Handling rebalance request from %s", msg.From)
|
cm.logger.Printf("Handling rebalance request from %s", msg.From)
|
||||||
|
|
||||||
// Implementation would handle the specific rebalancing logic
|
// Implementation would handle the specific rebalancing logic
|
||||||
// This is a simplified version
|
// This is a simplified version
|
||||||
@@ -220,7 +223,7 @@ func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
|
|||||||
|
|
||||||
// handleMigrationRequest processes actor migration requests
|
// handleMigrationRequest processes actor migration requests
|
||||||
func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) {
|
func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) {
|
||||||
cm.logger.Printf("🚚 Handling migration request from %s", msg.From)
|
cm.logger.Printf("Handling migration request from %s", msg.From)
|
||||||
|
|
||||||
// Implementation would handle the specific migration logic
|
// Implementation would handle the specific migration logic
|
||||||
// This is a simplified version
|
// This is a simplified version
|
||||||
@@ -232,7 +235,7 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) {
|
|||||||
return // Only leader can initiate rebalancing
|
return // Only leader can initiate rebalancing
|
||||||
}
|
}
|
||||||
|
|
||||||
cm.logger.Printf("⚖️ Triggering shard rebalancing: %s", reason)
|
cm.logger.Printf("Triggering shard rebalancing: %s", reason)
|
||||||
|
|
||||||
// Get active nodes
|
// Get active nodes
|
||||||
var activeNodes []*NodeInfo
|
var activeNodes []*NodeInfo
|
||||||
@@ -245,12 +248,12 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) {
|
|||||||
cm.mutex.RUnlock()
|
cm.mutex.RUnlock()
|
||||||
|
|
||||||
if len(activeNodes) == 0 {
|
if len(activeNodes) == 0 {
|
||||||
cm.logger.Printf("⚠️ No active nodes available for rebalancing")
|
cm.logger.Printf("No active nodes available for rebalancing")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// This would implement the actual rebalancing logic
|
// This would implement the actual rebalancing logic
|
||||||
cm.logger.Printf("🎯 Would rebalance across %d active nodes", len(activeNodes))
|
cm.logger.Printf("Would rebalance across %d active nodes", len(activeNodes))
|
||||||
}
|
}
|
||||||
|
|
||||||
// monitorNodes periodically checks node health and updates
|
// monitorNodes periodically checks node health and updates
|
||||||
@@ -279,7 +282,7 @@ func (cm *ClusterManager) checkNodeHealth() {
|
|||||||
for _, node := range cm.nodes {
|
for _, node := range cm.nodes {
|
||||||
if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive {
|
if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive {
|
||||||
node.Status = NodeStatusFailed
|
node.Status = NodeStatusFailed
|
||||||
cm.logger.Printf("💔 Node failed: %s", node.ID)
|
cm.logger.Printf("Node failed: %s", node.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,8 +33,26 @@ type ShardManager struct {
|
|||||||
replication int
|
replication int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewShardManager creates a new shard manager
|
// NewShardManager creates a new shard manager with default configuration
|
||||||
func NewShardManager(shardCount, replication int) *ShardManager {
|
func NewShardManager(shardCount, replication int) *ShardManager {
|
||||||
|
return NewShardManagerWithConfig(ShardConfig{
|
||||||
|
ShardCount: shardCount,
|
||||||
|
ReplicationFactor: replication,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewShardManagerWithConfig creates a new shard manager with custom configuration
|
||||||
|
func NewShardManagerWithConfig(config ShardConfig) *ShardManager {
|
||||||
|
// Apply defaults for zero values
|
||||||
|
shardCount := config.ShardCount
|
||||||
|
if shardCount == 0 {
|
||||||
|
shardCount = DefaultNumShards
|
||||||
|
}
|
||||||
|
replication := config.ReplicationFactor
|
||||||
|
if replication == 0 {
|
||||||
|
replication = 1
|
||||||
|
}
|
||||||
|
|
||||||
return &ShardManager{
|
return &ShardManager{
|
||||||
shardCount: shardCount,
|
shardCount: shardCount,
|
||||||
shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)},
|
shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)},
|
||||||
@@ -149,6 +167,15 @@ func (sm *ShardManager) GetActorsInShard(shardID int, nodeID string, vmRegistry
|
|||||||
return actors
|
return actors
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetShardCount returns the total number of shards
|
||||||
|
func (sm *ShardManager) GetShardCount() int {
|
||||||
|
return sm.shardCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetReplicationFactor returns the replication factor
|
||||||
|
func (sm *ShardManager) GetReplicationFactor() int {
|
||||||
|
return sm.replication
|
||||||
|
}
|
||||||
|
|
||||||
// ConsistentHashPlacement implements PlacementStrategy using consistent hashing
|
// ConsistentHashPlacement implements PlacementStrategy using consistent hashing
|
||||||
type ConsistentHashPlacement struct{}
|
type ConsistentHashPlacement struct{}
|
||||||
|
|||||||
145
cluster/types.go
145
cluster/types.go
@@ -4,17 +4,47 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Default configuration values
|
||||||
const (
|
const (
|
||||||
// NumShards defines the total number of shards in the cluster
|
// DefaultNumShards defines the default total number of shards in the cluster
|
||||||
NumShards = 1024
|
DefaultNumShards = 1024
|
||||||
// VirtualNodes defines the number of virtual nodes per physical node for consistent hashing
|
// DefaultVirtualNodes defines the default number of virtual nodes per physical node
|
||||||
VirtualNodes = 150
|
DefaultVirtualNodes = 150
|
||||||
// Leadership election constants
|
// Leadership election constants
|
||||||
LeaderLeaseTimeout = 10 * time.Second // How long a leader lease lasts
|
LeaderLeaseTimeout = 10 * time.Second // How long a leader lease lasts
|
||||||
HeartbeatInterval = 3 * time.Second // How often leader sends heartbeats
|
HeartbeatInterval = 3 * time.Second // How often leader sends heartbeats
|
||||||
ElectionTimeout = 2 * time.Second // How long to wait for election
|
ElectionTimeout = 2 * time.Second // How long to wait for election
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// HashRingConfig holds configuration for the consistent hash ring
|
||||||
|
type HashRingConfig struct {
|
||||||
|
// VirtualNodes is the number of virtual nodes per physical node (default: 150)
|
||||||
|
VirtualNodes int
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultHashRingConfig returns the default hash ring configuration
|
||||||
|
func DefaultHashRingConfig() HashRingConfig {
|
||||||
|
return HashRingConfig{
|
||||||
|
VirtualNodes: DefaultVirtualNodes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ShardConfig holds configuration for shard management
|
||||||
|
type ShardConfig struct {
|
||||||
|
// ShardCount is the total number of shards (default: 1024)
|
||||||
|
ShardCount int
|
||||||
|
// ReplicationFactor is the number of replicas per shard (default: 1)
|
||||||
|
ReplicationFactor int
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultShardConfig returns the default shard configuration
|
||||||
|
func DefaultShardConfig() ShardConfig {
|
||||||
|
return ShardConfig{
|
||||||
|
ShardCount: DefaultNumShards,
|
||||||
|
ReplicationFactor: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NodeStatus represents the health status of a node
|
// NodeStatus represents the health status of a node
|
||||||
type NodeStatus string
|
type NodeStatus string
|
||||||
|
|
||||||
@@ -30,14 +60,14 @@ type NodeInfo struct {
|
|||||||
Address string `json:"address"`
|
Address string `json:"address"`
|
||||||
Port int `json:"port"`
|
Port int `json:"port"`
|
||||||
Status NodeStatus `json:"status"`
|
Status NodeStatus `json:"status"`
|
||||||
Capacity float64 `json:"capacity"` // Maximum load capacity
|
Capacity float64 `json:"capacity"` // Maximum load capacity
|
||||||
Load float64 `json:"load"` // Current CPU/memory load
|
Load float64 `json:"load"` // Current CPU/memory load
|
||||||
LastSeen time.Time `json:"lastSeen"` // Last heartbeat timestamp
|
LastSeen time.Time `json:"lastSeen"` // Last heartbeat timestamp
|
||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
Metadata map[string]string `json:"metadata"`
|
Metadata map[string]string `json:"metadata"`
|
||||||
IsLeader bool `json:"isLeader"`
|
IsLeader bool `json:"isLeader"`
|
||||||
VMCount int `json:"vmCount"` // Number of VMs on this node
|
VMCount int `json:"vmCount"` // Number of VMs on this node
|
||||||
ShardIDs []int `json:"shardIds"` // Shards assigned to this node
|
ShardIDs []int `json:"shardIds"` // Shards assigned to this node
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeUpdateType represents the type of node update
|
// NodeUpdateType represents the type of node update
|
||||||
@@ -57,9 +87,9 @@ type NodeUpdate struct {
|
|||||||
|
|
||||||
// ShardMap represents the distribution of shards across cluster nodes
|
// ShardMap represents the distribution of shards across cluster nodes
|
||||||
type ShardMap struct {
|
type ShardMap struct {
|
||||||
Version uint64 `json:"version"` // Incremented on each change
|
Version uint64 `json:"version"` // Incremented on each change
|
||||||
Shards map[int][]string `json:"shards"` // shard ID -> [primary, replica1, replica2]
|
Shards map[int][]string `json:"shards"` // shard ID -> [primary, replica1, replica2]
|
||||||
Nodes map[string]NodeInfo `json:"nodes"` // node ID -> node info
|
Nodes map[string]NodeInfo `json:"nodes"` // node ID -> node info
|
||||||
UpdateTime time.Time `json:"updateTime"`
|
UpdateTime time.Time `json:"updateTime"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,23 +104,23 @@ type ClusterMessage struct {
|
|||||||
|
|
||||||
// RebalanceRequest represents a request to rebalance shards
|
// RebalanceRequest represents a request to rebalance shards
|
||||||
type RebalanceRequest struct {
|
type RebalanceRequest struct {
|
||||||
RequestID string `json:"requestId"`
|
RequestID string `json:"requestId"`
|
||||||
FromNode string `json:"fromNode"`
|
FromNode string `json:"fromNode"`
|
||||||
ToNode string `json:"toNode"`
|
ToNode string `json:"toNode"`
|
||||||
ShardIDs []int `json:"shardIds"`
|
ShardIDs []int `json:"shardIds"`
|
||||||
Reason string `json:"reason"`
|
Reason string `json:"reason"`
|
||||||
Migrations []ActorMigration `json:"migrations"`
|
Migrations []ActorMigration `json:"migrations"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ActorMigration represents the migration of an actor between nodes
|
// ActorMigration represents the migration of an actor between nodes
|
||||||
type ActorMigration struct {
|
type ActorMigration struct {
|
||||||
ActorID string `json:"actorId"`
|
ActorID string `json:"actorId"`
|
||||||
FromNode string `json:"fromNode"`
|
FromNode string `json:"fromNode"`
|
||||||
ToNode string `json:"toNode"`
|
ToNode string `json:"toNode"`
|
||||||
ShardID int `json:"shardId"`
|
ShardID int `json:"shardId"`
|
||||||
State map[string]interface{} `json:"state"`
|
State map[string]interface{} `json:"state"`
|
||||||
Version int64 `json:"version"`
|
Version int64 `json:"version"`
|
||||||
Status string `json:"status"` // "pending", "in_progress", "completed", "failed"
|
Status string `json:"status"` // "pending", "in_progress", "completed", "failed"
|
||||||
}
|
}
|
||||||
|
|
||||||
// LeaderElectionCallbacks defines callbacks for leadership changes
|
// LeaderElectionCallbacks defines callbacks for leadership changes
|
||||||
@@ -108,3 +138,68 @@ type LeadershipLease struct {
|
|||||||
StartedAt time.Time `json:"startedAt"`
|
StartedAt time.Time `json:"startedAt"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// VirtualMachine defines the interface for a virtual machine instance.
|
||||||
|
// This interface provides the minimal contract needed by the cluster package
|
||||||
|
// to interact with VMs without creating import cycles with the runtime package.
|
||||||
|
type VirtualMachine interface {
|
||||||
|
// GetID returns the unique identifier for this VM
|
||||||
|
GetID() string
|
||||||
|
// GetActorID returns the actor ID this VM represents
|
||||||
|
GetActorID() string
|
||||||
|
// GetState returns the current state of the VM
|
||||||
|
GetState() VMState
|
||||||
|
}
|
||||||
|
|
||||||
|
// VMState represents the state of a virtual machine
|
||||||
|
type VMState string
|
||||||
|
|
||||||
|
const (
|
||||||
|
VMStateIdle VMState = "idle"
|
||||||
|
VMStateRunning VMState = "running"
|
||||||
|
VMStatePaused VMState = "paused"
|
||||||
|
VMStateStopped VMState = "stopped"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RuntimeModel defines the interface for an EventStorming model that can be loaded into a runtime.
|
||||||
|
// This decouples the cluster package from the specific eventstorming package.
|
||||||
|
type RuntimeModel interface {
|
||||||
|
// GetID returns the unique identifier for this model
|
||||||
|
GetID() string
|
||||||
|
// GetName returns the name of this model
|
||||||
|
GetName() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// RuntimeMessage defines the interface for messages that can be sent through the runtime.
|
||||||
|
// This provides type safety for inter-actor communication without creating import cycles.
|
||||||
|
type RuntimeMessage interface {
|
||||||
|
// GetTargetActorID returns the ID of the actor this message is addressed to
|
||||||
|
GetTargetActorID() string
|
||||||
|
// GetType returns the message type identifier
|
||||||
|
GetType() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModelPayload is a concrete type for JSON-unmarshaling RuntimeModel payloads.
|
||||||
|
// Use this when receiving model data over the network.
|
||||||
|
type ModelPayload struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetID implements RuntimeModel
|
||||||
|
func (m *ModelPayload) GetID() string { return m.ID }
|
||||||
|
|
||||||
|
// GetName implements RuntimeModel
|
||||||
|
func (m *ModelPayload) GetName() string { return m.Name }
|
||||||
|
|
||||||
|
// MessagePayload is a concrete type for JSON-unmarshaling RuntimeMessage payloads.
|
||||||
|
// Use this when receiving message data over the network.
|
||||||
|
type MessagePayload struct {
|
||||||
|
TargetActorID string `json:"targetActorId"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTargetActorID implements RuntimeMessage
|
||||||
|
func (m *MessagePayload) GetTargetActorID() string { return m.TargetActorID }
|
||||||
|
|
||||||
|
// GetType implements RuntimeMessage
|
||||||
|
func (m *MessagePayload) GetType() string { return m.Type }
|
||||||
|
|||||||
@@ -13,11 +13,11 @@ import (
|
|||||||
|
|
||||||
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS
|
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS
|
||||||
type NATSEventBus struct {
|
type NATSEventBus struct {
|
||||||
*EventBus // Embed base EventBus for local subscriptions
|
*EventBus // Embed base EventBus for local subscriptions
|
||||||
nc *nats.Conn // NATS connection
|
nc *nats.Conn // NATS connection
|
||||||
subscriptions []*nats.Subscription
|
subscriptions []*nats.Subscription
|
||||||
namespaceSubscribers map[string]int // Track number of subscribers per namespace
|
namespaceSubscribers map[string]int // Track number of subscribers per namespace
|
||||||
nodeID string // Unique ID for this node
|
nodeID string // Unique ID for this node
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|||||||
46
store/config_test.go
Normal file
46
store/config_test.go
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDefaultJetStreamConfig(t *testing.T) {
|
||||||
|
config := DefaultJetStreamConfig()
|
||||||
|
|
||||||
|
if config.StreamRetention != DefaultStreamRetention {
|
||||||
|
t.Errorf("expected StreamRetention=%v, got %v", DefaultStreamRetention, config.StreamRetention)
|
||||||
|
}
|
||||||
|
if config.ReplicaCount != DefaultReplicaCount {
|
||||||
|
t.Errorf("expected ReplicaCount=%d, got %d", DefaultReplicaCount, config.ReplicaCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJetStreamConfigDefaults(t *testing.T) {
|
||||||
|
t.Run("default stream retention is 1 year", func(t *testing.T) {
|
||||||
|
expected := 365 * 24 * time.Hour
|
||||||
|
if DefaultStreamRetention != expected {
|
||||||
|
t.Errorf("expected DefaultStreamRetention=%v, got %v", expected, DefaultStreamRetention)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("default replica count is 1", func(t *testing.T) {
|
||||||
|
if DefaultReplicaCount != 1 {
|
||||||
|
t.Errorf("expected DefaultReplicaCount=1, got %d", DefaultReplicaCount)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJetStreamConfigCustomValues(t *testing.T) {
|
||||||
|
config := JetStreamConfig{
|
||||||
|
StreamRetention: 30 * 24 * time.Hour, // 30 days
|
||||||
|
ReplicaCount: 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.StreamRetention != 30*24*time.Hour {
|
||||||
|
t.Errorf("expected StreamRetention=30 days, got %v", config.StreamRetention)
|
||||||
|
}
|
||||||
|
if config.ReplicaCount != 3 {
|
||||||
|
t.Errorf("expected ReplicaCount=3, got %d", config.ReplicaCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,30 +11,66 @@ import (
|
|||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Default configuration values for JetStream event store
|
||||||
|
const (
|
||||||
|
DefaultStreamRetention = 365 * 24 * time.Hour // 1 year
|
||||||
|
DefaultReplicaCount = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
// JetStreamConfig holds configuration options for JetStreamEventStore
|
||||||
|
type JetStreamConfig struct {
|
||||||
|
// StreamRetention is how long to keep events (default: 1 year)
|
||||||
|
StreamRetention time.Duration
|
||||||
|
// ReplicaCount is the number of replicas for high availability (default: 1)
|
||||||
|
ReplicaCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultJetStreamConfig returns the default configuration
|
||||||
|
func DefaultJetStreamConfig() JetStreamConfig {
|
||||||
|
return JetStreamConfig{
|
||||||
|
StreamRetention: DefaultStreamRetention,
|
||||||
|
ReplicaCount: DefaultReplicaCount,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
||||||
// It also implements EventStoreWithErrors to report malformed events during replay.
|
// It also implements EventStoreWithErrors to report malformed events during replay.
|
||||||
type JetStreamEventStore struct {
|
type JetStreamEventStore struct {
|
||||||
js nats.JetStreamContext
|
js nats.JetStreamContext
|
||||||
streamName string
|
streamName string
|
||||||
|
config JetStreamConfig
|
||||||
mu sync.Mutex // Protects version checks during SaveEvent
|
mu sync.Mutex // Protects version checks during SaveEvent
|
||||||
versions map[string]int64 // actorID -> latest version cache
|
versions map[string]int64 // actorID -> latest version cache
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJetStreamEventStore creates a new JetStream-based event store
|
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
||||||
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
||||||
|
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
|
||||||
|
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
|
||||||
js, err := natsConn.JetStream()
|
js, err := natsConn.JetStream()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply defaults for zero values
|
||||||
|
if config.StreamRetention == 0 {
|
||||||
|
config.StreamRetention = DefaultStreamRetention
|
||||||
|
}
|
||||||
|
if config.ReplicaCount == 0 {
|
||||||
|
config.ReplicaCount = DefaultReplicaCount
|
||||||
|
}
|
||||||
|
|
||||||
// Create or update the stream
|
// Create or update the stream
|
||||||
stream := &nats.StreamConfig{
|
stream := &nats.StreamConfig{
|
||||||
Name: streamName,
|
Name: streamName,
|
||||||
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
|
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
|
||||||
Storage: nats.FileStorage,
|
Storage: nats.FileStorage,
|
||||||
Retention: nats.LimitsPolicy,
|
Retention: nats.LimitsPolicy,
|
||||||
MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year
|
MaxAge: config.StreamRetention,
|
||||||
Replicas: 1, // Can be increased for HA
|
Replicas: config.ReplicaCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = js.AddStream(stream)
|
_, err = js.AddStream(stream)
|
||||||
@@ -45,6 +81,7 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
|
|||||||
return &JetStreamEventStore{
|
return &JetStreamEventStore{
|
||||||
js: js,
|
js: js,
|
||||||
streamName: streamName,
|
streamName: streamName,
|
||||||
|
config: config,
|
||||||
versions: make(map[string]int64),
|
versions: make(map[string]int64),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user