Compare commits

..

1 Commits

Author SHA1 Message Date
b630258f60 Handle malformed events during JetStream replay with proper error reporting
All checks were successful
CI / build (pull_request) Successful in 17s
Add ReplayError and ReplayResult types to capture information about
malformed events encountered during replay. This allows callers to
inspect and handle corrupted data rather than having it silently skipped.

Key changes:
- Add ReplayError type with sequence number, raw data, and underlying error
- Add ReplayResult type containing both successfully parsed events and errors
- Add EventStoreWithErrors interface for stores that can report replay errors
- Implement GetEventsWithErrors on JetStreamEventStore
- Update GetEvents to maintain backward compatibility (still skips malformed)
- Add comprehensive unit tests for the new types

This addresses the issue of silent data loss during event-sourced replay
by giving callers visibility into data quality issues.

Closes #39

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 15:32:46 +01:00
13 changed files with 113 additions and 491 deletions

View File

@@ -44,4 +44,5 @@
// - Leader election ensures coordination continues despite node failures // - Leader election ensures coordination continues despite node failures
// - Actor migration allows rebalancing when cluster topology changes // - Actor migration allows rebalancing when cluster topology changes
// - Graceful shutdown with proper resource cleanup // - Graceful shutdown with proper resource cleanup
//
package cluster package cluster

View File

@@ -1,125 +0,0 @@
package cluster
import (
"testing"
)
func TestDefaultHashRingConfig(t *testing.T) {
config := DefaultHashRingConfig()
if config.VirtualNodes != DefaultVirtualNodes {
t.Errorf("expected VirtualNodes=%d, got %d", DefaultVirtualNodes, config.VirtualNodes)
}
}
func TestDefaultShardConfig(t *testing.T) {
config := DefaultShardConfig()
if config.ShardCount != DefaultNumShards {
t.Errorf("expected ShardCount=%d, got %d", DefaultNumShards, config.ShardCount)
}
if config.ReplicationFactor != 1 {
t.Errorf("expected ReplicationFactor=1, got %d", config.ReplicationFactor)
}
}
func TestNewConsistentHashRingWithConfig(t *testing.T) {
t.Run("custom virtual nodes", func(t *testing.T) {
config := HashRingConfig{VirtualNodes: 50}
ring := NewConsistentHashRingWithConfig(config)
ring.AddNode("test-node")
if len(ring.sortedHashes) != 50 {
t.Errorf("expected 50 virtual nodes, got %d", len(ring.sortedHashes))
}
if ring.GetVirtualNodes() != 50 {
t.Errorf("expected GetVirtualNodes()=50, got %d", ring.GetVirtualNodes())
}
})
t.Run("zero value uses default", func(t *testing.T) {
config := HashRingConfig{VirtualNodes: 0}
ring := NewConsistentHashRingWithConfig(config)
ring.AddNode("test-node")
if len(ring.sortedHashes) != DefaultVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", DefaultVirtualNodes, len(ring.sortedHashes))
}
})
t.Run("default constructor uses default config", func(t *testing.T) {
ring := NewConsistentHashRing()
ring.AddNode("test-node")
if len(ring.sortedHashes) != DefaultVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", DefaultVirtualNodes, len(ring.sortedHashes))
}
})
}
func TestNewShardManagerWithConfig(t *testing.T) {
t.Run("custom shard count", func(t *testing.T) {
config := ShardConfig{ShardCount: 256, ReplicationFactor: 2}
sm := NewShardManagerWithConfig(config)
if sm.GetShardCount() != 256 {
t.Errorf("expected shard count 256, got %d", sm.GetShardCount())
}
if sm.GetReplicationFactor() != 2 {
t.Errorf("expected replication factor 2, got %d", sm.GetReplicationFactor())
}
})
t.Run("zero values use defaults", func(t *testing.T) {
config := ShardConfig{ShardCount: 0, ReplicationFactor: 0}
sm := NewShardManagerWithConfig(config)
if sm.GetShardCount() != DefaultNumShards {
t.Errorf("expected shard count %d, got %d", DefaultNumShards, sm.GetShardCount())
}
if sm.GetReplicationFactor() != 1 {
t.Errorf("expected replication factor 1, got %d", sm.GetReplicationFactor())
}
})
t.Run("legacy constructor still works", func(t *testing.T) {
sm := NewShardManager(512, 3)
if sm.GetShardCount() != 512 {
t.Errorf("expected shard count 512, got %d", sm.GetShardCount())
}
if sm.GetReplicationFactor() != 3 {
t.Errorf("expected replication factor 3, got %d", sm.GetReplicationFactor())
}
})
}
func TestShardManagerGetShard_DifferentShardCounts(t *testing.T) {
testCases := []struct {
shardCount int
}{
{shardCount: 16},
{shardCount: 64},
{shardCount: 256},
{shardCount: 1024},
{shardCount: 4096},
}
for _, tc := range testCases {
t.Run("shardCount="+string(rune(tc.shardCount)), func(t *testing.T) {
sm := NewShardManagerWithConfig(ShardConfig{ShardCount: tc.shardCount})
// Verify all actor IDs map to valid shard range
for i := 0; i < 1000; i++ {
actorID := "actor-" + string(rune(i))
shard := sm.GetShard(actorID)
if shard < 0 || shard >= tc.shardCount {
t.Errorf("shard %d out of range [0, %d)", shard, tc.shardCount)
}
}
})
}
}

View File

@@ -12,7 +12,7 @@ import (
type DistributedVM struct { type DistributedVM struct {
nodeID string nodeID string
cluster *ClusterManager cluster *ClusterManager
localRuntime Runtime localRuntime Runtime // Interface to avoid import cycles
sharding *ShardManager sharding *ShardManager
discovery *NodeDiscovery discovery *NodeDiscovery
natsConn *nats.Conn natsConn *nats.Conn
@@ -20,31 +20,19 @@ type DistributedVM struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
// Runtime defines the interface for a local runtime that executes actors. // Runtime interface to avoid import cycles with main aether package
// This interface decouples the cluster package from specific runtime implementations.
type Runtime interface { type Runtime interface {
// Start initializes and starts the runtime
Start() error Start() error
// LoadModel loads an EventStorming model into the runtime LoadModel(model interface{}) error
LoadModel(model RuntimeModel) error SendMessage(message interface{}) error
// SendMessage sends a message to an actor in the runtime
SendMessage(message RuntimeMessage) error
} }
// DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding. // DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding
// It provides the cluster manager with access to VM information without import cycles.
type DistributedVMRegistry struct { type DistributedVMRegistry struct {
vmProvider VMProvider runtime interface{} // Runtime interface to avoid import cycles
sharding *ShardManager sharding *ShardManager
} }
// VMProvider defines an interface for accessing VMs from a runtime.
// This is used by DistributedVMRegistry to get VM information.
type VMProvider interface {
// GetActiveVMs returns a map of actor IDs to their VirtualMachine instances
GetActiveVMs() map[string]VirtualMachine
}
// NewDistributedVM creates a distributed VM runtime cluster node // NewDistributedVM creates a distributed VM runtime cluster node
func NewDistributedVM(nodeID string, natsURLs []string, localRuntime Runtime) (*DistributedVM, error) { func NewDistributedVM(nodeID string, natsURLs []string, localRuntime Runtime) (*DistributedVM, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@@ -79,17 +67,14 @@ func NewDistributedVM(nodeID string, natsURLs []string, localRuntime Runtime) (*
cancel: cancel, cancel: cancel,
} }
return dvm, nil // Create VM registry and connect it to cluster manager
}
// SetVMProvider sets the VM provider for the distributed VM registry.
// This should be called after the runtime is fully initialized.
func (dvm *DistributedVM) SetVMProvider(provider VMProvider) {
vmRegistry := &DistributedVMRegistry{ vmRegistry := &DistributedVMRegistry{
vmProvider: provider, runtime: localRuntime,
sharding: dvm.sharding, sharding: sharding,
} }
dvm.cluster.SetVMRegistry(vmRegistry) cluster.SetVMRegistry(vmRegistry)
return dvm, nil
} }
// Start begins the distributed VM cluster node // Start begins the distributed VM cluster node
@@ -118,7 +103,7 @@ func (dvm *DistributedVM) Stop() {
} }
// LoadModel distributes EventStorming model across the cluster with VM templates // LoadModel distributes EventStorming model across the cluster with VM templates
func (dvm *DistributedVM) LoadModel(model RuntimeModel) error { func (dvm *DistributedVM) LoadModel(model interface{}) error {
// Load model locally first // Load model locally first
if err := dvm.localRuntime.LoadModel(model); err != nil { if err := dvm.localRuntime.LoadModel(model); err != nil {
return fmt.Errorf("failed to load model locally: %w", err) return fmt.Errorf("failed to load model locally: %w", err)
@@ -136,7 +121,7 @@ func (dvm *DistributedVM) LoadModel(model RuntimeModel) error {
} }
// SendMessage routes messages across the distributed cluster // SendMessage routes messages across the distributed cluster
func (dvm *DistributedVM) SendMessage(message RuntimeMessage) error { func (dvm *DistributedVM) SendMessage(message interface{}) error {
// This is a simplified implementation // This is a simplified implementation
// In practice, this would determine the target node based on sharding // In practice, this would determine the target node based on sharding
// and route the message appropriately // and route the message appropriately
@@ -177,29 +162,15 @@ func (dvm *DistributedVM) handleClusterMessage(msg *nats.Msg) {
switch clusterMsg.Type { switch clusterMsg.Type {
case "load_model": case "load_model":
// Handle model loading from other nodes // Handle model loading from other nodes
// Re-marshal and unmarshal to convert map[string]interface{} to concrete type if model := clusterMsg.Payload; model != nil {
payloadBytes, err := json.Marshal(clusterMsg.Payload) dvm.localRuntime.LoadModel(model)
if err != nil {
return
} }
var model ModelPayload
if err := json.Unmarshal(payloadBytes, &model); err != nil {
return
}
dvm.localRuntime.LoadModel(&model)
case "route_message": case "route_message":
// Handle message routing from other nodes // Handle message routing from other nodes
// Re-marshal and unmarshal to convert map[string]interface{} to concrete type if message := clusterMsg.Payload; message != nil {
payloadBytes, err := json.Marshal(clusterMsg.Payload) dvm.localRuntime.SendMessage(message)
if err != nil {
return
} }
var message MessagePayload
if err := json.Unmarshal(payloadBytes, &message); err != nil {
return
}
dvm.localRuntime.SendMessage(&message)
case "rebalance": case "rebalance":
// Handle shard rebalancing requests // Handle shard rebalancing requests
@@ -237,12 +208,11 @@ func (dvm *DistributedVM) GetClusterInfo() map[string]interface{} {
} }
} }
// GetActiveVMs returns a map of active VMs from the VM provider // GetActiveVMs returns a map of active VMs (implementation depends on runtime)
func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]VirtualMachine { func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]interface{} {
if dvr.vmProvider == nil { // This would need to access the actual runtime's VM registry
return make(map[string]VirtualMachine) // For now, return empty map to avoid import cycles
} return make(map[string]interface{})
return dvr.vmProvider.GetActiveVMs()
} }
// GetShard returns the shard number for the given actor ID // GetShard returns the shard number for the given actor ID

View File

@@ -12,24 +12,13 @@ type ConsistentHashRing struct {
ring map[uint32]string // hash -> node ID ring map[uint32]string // hash -> node ID
sortedHashes []uint32 // sorted hash keys sortedHashes []uint32 // sorted hash keys
nodes map[string]bool // active nodes nodes map[string]bool // active nodes
virtualNodes int // number of virtual nodes per physical node
} }
// NewConsistentHashRing creates a new consistent hash ring with default configuration // NewConsistentHashRing creates a new consistent hash ring
func NewConsistentHashRing() *ConsistentHashRing { func NewConsistentHashRing() *ConsistentHashRing {
return NewConsistentHashRingWithConfig(DefaultHashRingConfig())
}
// NewConsistentHashRingWithConfig creates a new consistent hash ring with custom configuration
func NewConsistentHashRingWithConfig(config HashRingConfig) *ConsistentHashRing {
virtualNodes := config.VirtualNodes
if virtualNodes == 0 {
virtualNodes = DefaultVirtualNodes
}
return &ConsistentHashRing{ return &ConsistentHashRing{
ring: make(map[uint32]string), ring: make(map[uint32]string),
nodes: make(map[string]bool), nodes: make(map[string]bool),
virtualNodes: virtualNodes,
} }
} }
@@ -42,7 +31,7 @@ func (chr *ConsistentHashRing) AddNode(nodeID string) {
chr.nodes[nodeID] = true chr.nodes[nodeID] = true
// Add virtual nodes for better distribution // Add virtual nodes for better distribution
for i := 0; i < chr.virtualNodes; i++ { for i := 0; i < VirtualNodes; i++ {
virtualKey := fmt.Sprintf("%s:%d", nodeID, i) virtualKey := fmt.Sprintf("%s:%d", nodeID, i)
hash := chr.hash(virtualKey) hash := chr.hash(virtualKey)
chr.ring[hash] = nodeID chr.ring[hash] = nodeID
@@ -114,8 +103,3 @@ func (chr *ConsistentHashRing) GetNodes() []string {
func (chr *ConsistentHashRing) IsEmpty() bool { func (chr *ConsistentHashRing) IsEmpty() bool {
return len(chr.nodes) == 0 return len(chr.nodes) == 0
} }
// GetVirtualNodes returns the number of virtual nodes per physical node
func (chr *ConsistentHashRing) GetVirtualNodes() int {
return chr.virtualNodes
}

View File

@@ -42,7 +42,7 @@ func TestAddNode(t *testing.T) {
} }
// Verify virtual nodes were added // Verify virtual nodes were added
expectedVirtualNodes := DefaultVirtualNodes expectedVirtualNodes := VirtualNodes
if len(ring.sortedHashes) != expectedVirtualNodes { if len(ring.sortedHashes) != expectedVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", expectedVirtualNodes, len(ring.sortedHashes)) t.Errorf("expected %d virtual nodes, got %d", expectedVirtualNodes, len(ring.sortedHashes))
} }
@@ -86,7 +86,7 @@ func TestAddNode_MultipleNodes(t *testing.T) {
t.Errorf("expected 3 nodes, got %d", len(nodes)) t.Errorf("expected 3 nodes, got %d", len(nodes))
} }
expectedHashes := DefaultVirtualNodes * 3 expectedHashes := VirtualNodes * 3
if len(ring.sortedHashes) != expectedHashes { if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes)) t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
} }
@@ -118,7 +118,7 @@ func TestRemoveNode(t *testing.T) {
} }
// Verify virtual nodes were removed // Verify virtual nodes were removed
expectedHashes := DefaultVirtualNodes expectedHashes := VirtualNodes
if len(ring.sortedHashes) != expectedHashes { if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes)) t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
} }
@@ -321,7 +321,7 @@ func TestRingBehavior_ManyNodes(t *testing.T) {
} }
// Verify virtual nodes count // Verify virtual nodes count
expectedHashes := numNodes * DefaultVirtualNodes expectedHashes := numNodes * VirtualNodes
if len(ring.sortedHashes) != expectedHashes { if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes)) t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
} }
@@ -355,7 +355,7 @@ func TestRingBehavior_ManyNodes(t *testing.T) {
} }
} }
func TestDefaultVirtualNodes_ImproveDistribution(t *testing.T) { func TestVirtualNodes_ImproveDistribution(t *testing.T) {
// Test that virtual nodes actually improve distribution // Test that virtual nodes actually improve distribution
// by comparing with a theoretical single-hash-per-node scenario // by comparing with a theoretical single-hash-per-node scenario
@@ -386,7 +386,7 @@ func TestDefaultVirtualNodes_ImproveDistribution(t *testing.T) {
stdDev := math.Sqrt(sumSquaredDiff / float64(numNodes)) stdDev := math.Sqrt(sumSquaredDiff / float64(numNodes))
coefficientOfVariation := stdDev / expectedPerNode coefficientOfVariation := stdDev / expectedPerNode
// With DefaultVirtualNodes=150, we expect good distribution // With VirtualNodes=150, we expect good distribution
// Coefficient of variation should be low (< 15%) // Coefficient of variation should be low (< 15%)
if coefficientOfVariation > 0.15 { if coefficientOfVariation > 0.15 {
t.Errorf("distribution has high coefficient of variation: %.2f%% (expected < 15%%)", t.Errorf("distribution has high coefficient of variation: %.2f%% (expected < 15%%)",
@@ -394,8 +394,8 @@ func TestDefaultVirtualNodes_ImproveDistribution(t *testing.T) {
} }
// Verify that the actual number of virtual nodes matches expected // Verify that the actual number of virtual nodes matches expected
if len(ring.sortedHashes) != numNodes*DefaultVirtualNodes { if len(ring.sortedHashes) != numNodes*VirtualNodes {
t.Errorf("expected %d virtual node hashes, got %d", numNodes*DefaultVirtualNodes, len(ring.sortedHashes)) t.Errorf("expected %d virtual node hashes, got %d", numNodes*VirtualNodes, len(ring.sortedHashes))
} }
} }

View File

@@ -12,12 +12,9 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
// VMRegistry provides access to local VM information for cluster operations. // VMRegistry provides access to local VM information for cluster operations
// Implementations must provide thread-safe access to VM data.
type VMRegistry interface { type VMRegistry interface {
// GetActiveVMs returns a map of actor IDs to their VirtualMachine instances GetActiveVMs() map[string]interface{} // VirtualMachine interface to avoid import cycles
GetActiveVMs() map[string]VirtualMachine
// GetShard returns the shard number for a given actor ID
GetShard(actorID string) int GetShard(actorID string) int
} }
@@ -53,13 +50,13 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context)
// Create leadership election with callbacks // Create leadership election with callbacks
callbacks := LeaderElectionCallbacks{ callbacks := LeaderElectionCallbacks{
OnBecameLeader: func() { OnBecameLeader: func() {
cm.logger.Printf("This node became the cluster leader - can initiate rebalancing") cm.logger.Printf("👑 This node became the cluster leader - can initiate rebalancing")
}, },
OnLostLeader: func() { OnLostLeader: func() {
cm.logger.Printf("This node lost cluster leadership") cm.logger.Printf("📉 This node lost cluster leadership")
}, },
OnNewLeader: func(leaderID string) { OnNewLeader: func(leaderID string) {
cm.logger.Printf("Cluster leadership changed to: %s", leaderID) cm.logger.Printf("🔄 Cluster leadership changed to: %s", leaderID)
}, },
} }
@@ -74,7 +71,7 @@ func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context)
// Start begins cluster management operations // Start begins cluster management operations
func (cm *ClusterManager) Start() { func (cm *ClusterManager) Start() {
cm.logger.Printf("Starting cluster manager") cm.logger.Printf("🚀 Starting cluster manager")
// Start leader election // Start leader election
cm.election.Start() cm.election.Start()
@@ -91,7 +88,7 @@ func (cm *ClusterManager) Start() {
// Stop gracefully stops the cluster manager // Stop gracefully stops the cluster manager
func (cm *ClusterManager) Stop() { func (cm *ClusterManager) Stop() {
cm.logger.Printf("Stopping cluster manager") cm.logger.Printf("🛑 Stopping cluster manager")
if cm.election != nil { if cm.election != nil {
cm.election.Stop() cm.election.Stop()
@@ -141,7 +138,7 @@ func (cm *ClusterManager) GetActorsInShard(shardID int) []string {
func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) { func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
var clusterMsg ClusterMessage var clusterMsg ClusterMessage
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil { if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
cm.logger.Printf("Invalid cluster message: %v", err) cm.logger.Printf("⚠️ Invalid cluster message: %v", err)
return return
} }
@@ -155,7 +152,7 @@ func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
cm.handleNodeUpdate(update) cm.handleNodeUpdate(update)
} }
default: default:
cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type) cm.logger.Printf("⚠️ Unknown cluster message type: %s", clusterMsg.Type)
} }
} }
@@ -168,12 +165,12 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
case NodeJoined: case NodeJoined:
cm.nodes[update.Node.ID] = update.Node cm.nodes[update.Node.ID] = update.Node
cm.hashRing.AddNode(update.Node.ID) cm.hashRing.AddNode(update.Node.ID)
cm.logger.Printf("Node joined: %s", update.Node.ID) cm.logger.Printf(" Node joined: %s", update.Node.ID)
case NodeLeft: case NodeLeft:
delete(cm.nodes, update.Node.ID) delete(cm.nodes, update.Node.ID)
cm.hashRing.RemoveNode(update.Node.ID) cm.hashRing.RemoveNode(update.Node.ID)
cm.logger.Printf("Node left: %s", update.Node.ID) cm.logger.Printf(" Node left: %s", update.Node.ID)
case NodeUpdated: case NodeUpdated:
if node, exists := cm.nodes[update.Node.ID]; exists { if node, exists := cm.nodes[update.Node.ID]; exists {
@@ -191,7 +188,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
for _, node := range cm.nodes { for _, node := range cm.nodes {
if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed { if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed {
node.Status = NodeStatusFailed node.Status = NodeStatusFailed
cm.logger.Printf("Node marked as failed: %s (last seen: %s)", cm.logger.Printf("Node marked as failed: %s (last seen: %s)",
node.ID, node.LastSeen.Format(time.RFC3339)) node.ID, node.LastSeen.Format(time.RFC3339))
} }
} }
@@ -215,7 +212,7 @@ func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
// handleRebalanceRequest processes cluster rebalancing requests // handleRebalanceRequest processes cluster rebalancing requests
func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) { func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
cm.logger.Printf("Handling rebalance request from %s", msg.From) cm.logger.Printf("🔄 Handling rebalance request from %s", msg.From)
// Implementation would handle the specific rebalancing logic // Implementation would handle the specific rebalancing logic
// This is a simplified version // This is a simplified version
@@ -223,7 +220,7 @@ func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) {
// handleMigrationRequest processes actor migration requests // handleMigrationRequest processes actor migration requests
func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) { func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) {
cm.logger.Printf("Handling migration request from %s", msg.From) cm.logger.Printf("🚚 Handling migration request from %s", msg.From)
// Implementation would handle the specific migration logic // Implementation would handle the specific migration logic
// This is a simplified version // This is a simplified version
@@ -235,7 +232,7 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) {
return // Only leader can initiate rebalancing return // Only leader can initiate rebalancing
} }
cm.logger.Printf("Triggering shard rebalancing: %s", reason) cm.logger.Printf("⚖️ Triggering shard rebalancing: %s", reason)
// Get active nodes // Get active nodes
var activeNodes []*NodeInfo var activeNodes []*NodeInfo
@@ -248,12 +245,12 @@ func (cm *ClusterManager) triggerShardRebalancing(reason string) {
cm.mutex.RUnlock() cm.mutex.RUnlock()
if len(activeNodes) == 0 { if len(activeNodes) == 0 {
cm.logger.Printf("No active nodes available for rebalancing") cm.logger.Printf("⚠️ No active nodes available for rebalancing")
return return
} }
// This would implement the actual rebalancing logic // This would implement the actual rebalancing logic
cm.logger.Printf("Would rebalance across %d active nodes", len(activeNodes)) cm.logger.Printf("🎯 Would rebalance across %d active nodes", len(activeNodes))
} }
// monitorNodes periodically checks node health and updates // monitorNodes periodically checks node health and updates
@@ -282,7 +279,7 @@ func (cm *ClusterManager) checkNodeHealth() {
for _, node := range cm.nodes { for _, node := range cm.nodes {
if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive { if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive {
node.Status = NodeStatusFailed node.Status = NodeStatusFailed
cm.logger.Printf("Node failed: %s", node.ID) cm.logger.Printf("💔 Node failed: %s", node.ID)
} }
} }
} }

View File

@@ -33,26 +33,8 @@ type ShardManager struct {
replication int replication int
} }
// NewShardManager creates a new shard manager with default configuration // NewShardManager creates a new shard manager
func NewShardManager(shardCount, replication int) *ShardManager { func NewShardManager(shardCount, replication int) *ShardManager {
return NewShardManagerWithConfig(ShardConfig{
ShardCount: shardCount,
ReplicationFactor: replication,
})
}
// NewShardManagerWithConfig creates a new shard manager with custom configuration
func NewShardManagerWithConfig(config ShardConfig) *ShardManager {
// Apply defaults for zero values
shardCount := config.ShardCount
if shardCount == 0 {
shardCount = DefaultNumShards
}
replication := config.ReplicationFactor
if replication == 0 {
replication = 1
}
return &ShardManager{ return &ShardManager{
shardCount: shardCount, shardCount: shardCount,
shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)}, shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)},
@@ -167,15 +149,6 @@ func (sm *ShardManager) GetActorsInShard(shardID int, nodeID string, vmRegistry
return actors return actors
} }
// GetShardCount returns the total number of shards
func (sm *ShardManager) GetShardCount() int {
return sm.shardCount
}
// GetReplicationFactor returns the replication factor
func (sm *ShardManager) GetReplicationFactor() int {
return sm.replication
}
// ConsistentHashPlacement implements PlacementStrategy using consistent hashing // ConsistentHashPlacement implements PlacementStrategy using consistent hashing
type ConsistentHashPlacement struct{} type ConsistentHashPlacement struct{}

View File

@@ -4,47 +4,17 @@ import (
"time" "time"
) )
// Default configuration values
const ( const (
// DefaultNumShards defines the default total number of shards in the cluster // NumShards defines the total number of shards in the cluster
DefaultNumShards = 1024 NumShards = 1024
// DefaultVirtualNodes defines the default number of virtual nodes per physical node // VirtualNodes defines the number of virtual nodes per physical node for consistent hashing
DefaultVirtualNodes = 150 VirtualNodes = 150
// Leadership election constants // Leadership election constants
LeaderLeaseTimeout = 10 * time.Second // How long a leader lease lasts LeaderLeaseTimeout = 10 * time.Second // How long a leader lease lasts
HeartbeatInterval = 3 * time.Second // How often leader sends heartbeats HeartbeatInterval = 3 * time.Second // How often leader sends heartbeats
ElectionTimeout = 2 * time.Second // How long to wait for election ElectionTimeout = 2 * time.Second // How long to wait for election
) )
// HashRingConfig holds configuration for the consistent hash ring
type HashRingConfig struct {
// VirtualNodes is the number of virtual nodes per physical node (default: 150)
VirtualNodes int
}
// DefaultHashRingConfig returns the default hash ring configuration
func DefaultHashRingConfig() HashRingConfig {
return HashRingConfig{
VirtualNodes: DefaultVirtualNodes,
}
}
// ShardConfig holds configuration for shard management
type ShardConfig struct {
// ShardCount is the total number of shards (default: 1024)
ShardCount int
// ReplicationFactor is the number of replicas per shard (default: 1)
ReplicationFactor int
}
// DefaultShardConfig returns the default shard configuration
func DefaultShardConfig() ShardConfig {
return ShardConfig{
ShardCount: DefaultNumShards,
ReplicationFactor: 1,
}
}
// NodeStatus represents the health status of a node // NodeStatus represents the health status of a node
type NodeStatus string type NodeStatus string
@@ -138,68 +108,3 @@ type LeadershipLease struct {
StartedAt time.Time `json:"startedAt"` StartedAt time.Time `json:"startedAt"`
} }
// VirtualMachine defines the interface for a virtual machine instance.
// This interface provides the minimal contract needed by the cluster package
// to interact with VMs without creating import cycles with the runtime package.
type VirtualMachine interface {
// GetID returns the unique identifier for this VM
GetID() string
// GetActorID returns the actor ID this VM represents
GetActorID() string
// GetState returns the current state of the VM
GetState() VMState
}
// VMState represents the state of a virtual machine
type VMState string
const (
VMStateIdle VMState = "idle"
VMStateRunning VMState = "running"
VMStatePaused VMState = "paused"
VMStateStopped VMState = "stopped"
)
// RuntimeModel defines the interface for an EventStorming model that can be loaded into a runtime.
// This decouples the cluster package from the specific eventstorming package.
type RuntimeModel interface {
// GetID returns the unique identifier for this model
GetID() string
// GetName returns the name of this model
GetName() string
}
// RuntimeMessage defines the interface for messages that can be sent through the runtime.
// This provides type safety for inter-actor communication without creating import cycles.
type RuntimeMessage interface {
// GetTargetActorID returns the ID of the actor this message is addressed to
GetTargetActorID() string
// GetType returns the message type identifier
GetType() string
}
// ModelPayload is a concrete type for JSON-unmarshaling RuntimeModel payloads.
// Use this when receiving model data over the network.
type ModelPayload struct {
ID string `json:"id"`
Name string `json:"name"`
}
// GetID implements RuntimeModel
func (m *ModelPayload) GetID() string { return m.ID }
// GetName implements RuntimeModel
func (m *ModelPayload) GetName() string { return m.Name }
// MessagePayload is a concrete type for JSON-unmarshaling RuntimeMessage payloads.
// Use this when receiving message data over the network.
type MessagePayload struct {
TargetActorID string `json:"targetActorId"`
Type string `json:"type"`
}
// GetTargetActorID implements RuntimeMessage
func (m *MessagePayload) GetTargetActorID() string { return m.TargetActorID }
// GetType implements RuntimeMessage
func (m *MessagePayload) GetType() string { return m.Type }

View File

@@ -1,46 +0,0 @@
package store
import (
"testing"
"time"
)
func TestDefaultJetStreamConfig(t *testing.T) {
config := DefaultJetStreamConfig()
if config.StreamRetention != DefaultStreamRetention {
t.Errorf("expected StreamRetention=%v, got %v", DefaultStreamRetention, config.StreamRetention)
}
if config.ReplicaCount != DefaultReplicaCount {
t.Errorf("expected ReplicaCount=%d, got %d", DefaultReplicaCount, config.ReplicaCount)
}
}
func TestJetStreamConfigDefaults(t *testing.T) {
t.Run("default stream retention is 1 year", func(t *testing.T) {
expected := 365 * 24 * time.Hour
if DefaultStreamRetention != expected {
t.Errorf("expected DefaultStreamRetention=%v, got %v", expected, DefaultStreamRetention)
}
})
t.Run("default replica count is 1", func(t *testing.T) {
if DefaultReplicaCount != 1 {
t.Errorf("expected DefaultReplicaCount=1, got %d", DefaultReplicaCount)
}
})
}
func TestJetStreamConfigCustomValues(t *testing.T) {
config := JetStreamConfig{
StreamRetention: 30 * 24 * time.Hour, // 30 days
ReplicaCount: 3,
}
if config.StreamRetention != 30*24*time.Hour {
t.Errorf("expected StreamRetention=30 days, got %v", config.StreamRetention)
}
if config.ReplicaCount != 3 {
t.Errorf("expected ReplicaCount=3, got %d", config.ReplicaCount)
}
}

View File

@@ -11,66 +11,30 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
// Default configuration values for JetStream event store
const (
DefaultStreamRetention = 365 * 24 * time.Hour // 1 year
DefaultReplicaCount = 1
)
// JetStreamConfig holds configuration options for JetStreamEventStore
type JetStreamConfig struct {
// StreamRetention is how long to keep events (default: 1 year)
StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int
}
// DefaultJetStreamConfig returns the default configuration
func DefaultJetStreamConfig() JetStreamConfig {
return JetStreamConfig{
StreamRetention: DefaultStreamRetention,
ReplicaCount: DefaultReplicaCount,
}
}
// JetStreamEventStore implements EventStore using NATS JetStream for persistence. // JetStreamEventStore implements EventStore using NATS JetStream for persistence.
// It also implements EventStoreWithErrors to report malformed events during replay. // It also implements EventStoreWithErrors to report malformed events during replay.
type JetStreamEventStore struct { type JetStreamEventStore struct {
js nats.JetStreamContext js nats.JetStreamContext
streamName string streamName string
config JetStreamConfig
mu sync.Mutex // Protects version checks during SaveEvent mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache versions map[string]int64 // actorID -> latest version cache
} }
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration // NewJetStreamEventStore creates a new JetStream-based event store
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) { func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
}
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
js, err := natsConn.JetStream() js, err := natsConn.JetStream()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get JetStream context: %w", err) return nil, fmt.Errorf("failed to get JetStream context: %w", err)
} }
// Apply defaults for zero values
if config.StreamRetention == 0 {
config.StreamRetention = DefaultStreamRetention
}
if config.ReplicaCount == 0 {
config.ReplicaCount = DefaultReplicaCount
}
// Create or update the stream // Create or update the stream
stream := &nats.StreamConfig{ stream := &nats.StreamConfig{
Name: streamName, Name: streamName,
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)}, Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
Storage: nats.FileStorage, Storage: nats.FileStorage,
Retention: nats.LimitsPolicy, Retention: nats.LimitsPolicy,
MaxAge: config.StreamRetention, MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year
Replicas: config.ReplicaCount, Replicas: 1, // Can be increased for HA
} }
_, err = js.AddStream(stream) _, err = js.AddStream(stream)
@@ -81,7 +45,6 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
return &JetStreamEventStore{ return &JetStreamEventStore{
js: js, js: js,
streamName: streamName, streamName: streamName,
config: config,
versions: make(map[string]int64), versions: make(map[string]int64),
}, nil }, nil
} }