[Issue #38] Make configuration values injectable rather than hardcoded #43

Merged
HugoNijhuis merged 1 commits from issue-38-injectable-config into main 2026-01-10 15:10:50 +00:00
13 changed files with 353 additions and 74 deletions

View File

@@ -44,5 +44,4 @@
// - Leader election ensures coordination continues despite node failures
// - Actor migration allows rebalancing when cluster topology changes
// - Graceful shutdown with proper resource cleanup
//
package cluster

125
cluster/config_test.go Normal file
View File

@@ -0,0 +1,125 @@
package cluster
import (
"testing"
)
func TestDefaultHashRingConfig(t *testing.T) {
config := DefaultHashRingConfig()
if config.VirtualNodes != DefaultVirtualNodes {
t.Errorf("expected VirtualNodes=%d, got %d", DefaultVirtualNodes, config.VirtualNodes)
}
}
func TestDefaultShardConfig(t *testing.T) {
config := DefaultShardConfig()
if config.ShardCount != DefaultNumShards {
t.Errorf("expected ShardCount=%d, got %d", DefaultNumShards, config.ShardCount)
}
if config.ReplicationFactor != 1 {
t.Errorf("expected ReplicationFactor=1, got %d", config.ReplicationFactor)
}
}
func TestNewConsistentHashRingWithConfig(t *testing.T) {
t.Run("custom virtual nodes", func(t *testing.T) {
config := HashRingConfig{VirtualNodes: 50}
ring := NewConsistentHashRingWithConfig(config)
ring.AddNode("test-node")
if len(ring.sortedHashes) != 50 {
t.Errorf("expected 50 virtual nodes, got %d", len(ring.sortedHashes))
}
if ring.GetVirtualNodes() != 50 {
t.Errorf("expected GetVirtualNodes()=50, got %d", ring.GetVirtualNodes())
}
})
t.Run("zero value uses default", func(t *testing.T) {
config := HashRingConfig{VirtualNodes: 0}
ring := NewConsistentHashRingWithConfig(config)
ring.AddNode("test-node")
if len(ring.sortedHashes) != DefaultVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", DefaultVirtualNodes, len(ring.sortedHashes))
}
})
t.Run("default constructor uses default config", func(t *testing.T) {
ring := NewConsistentHashRing()
ring.AddNode("test-node")
if len(ring.sortedHashes) != DefaultVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", DefaultVirtualNodes, len(ring.sortedHashes))
}
})
}
func TestNewShardManagerWithConfig(t *testing.T) {
t.Run("custom shard count", func(t *testing.T) {
config := ShardConfig{ShardCount: 256, ReplicationFactor: 2}
sm := NewShardManagerWithConfig(config)
if sm.GetShardCount() != 256 {
t.Errorf("expected shard count 256, got %d", sm.GetShardCount())
}
if sm.GetReplicationFactor() != 2 {
t.Errorf("expected replication factor 2, got %d", sm.GetReplicationFactor())
}
})
t.Run("zero values use defaults", func(t *testing.T) {
config := ShardConfig{ShardCount: 0, ReplicationFactor: 0}
sm := NewShardManagerWithConfig(config)
if sm.GetShardCount() != DefaultNumShards {
t.Errorf("expected shard count %d, got %d", DefaultNumShards, sm.GetShardCount())
}
if sm.GetReplicationFactor() != 1 {
t.Errorf("expected replication factor 1, got %d", sm.GetReplicationFactor())
}
})
t.Run("legacy constructor still works", func(t *testing.T) {
sm := NewShardManager(512, 3)
if sm.GetShardCount() != 512 {
t.Errorf("expected shard count 512, got %d", sm.GetShardCount())
}
if sm.GetReplicationFactor() != 3 {
t.Errorf("expected replication factor 3, got %d", sm.GetReplicationFactor())
}
})
}
func TestShardManagerGetShard_DifferentShardCounts(t *testing.T) {
testCases := []struct {
shardCount int
}{
{shardCount: 16},
{shardCount: 64},
{shardCount: 256},
{shardCount: 1024},
{shardCount: 4096},
}
for _, tc := range testCases {
t.Run("shardCount="+string(rune(tc.shardCount)), func(t *testing.T) {
sm := NewShardManagerWithConfig(ShardConfig{ShardCount: tc.shardCount})
// Verify all actor IDs map to valid shard range
for i := 0; i < 1000; i++ {
actorID := "actor-" + string(rune(i))
shard := sm.GetShard(actorID)
if shard < 0 || shard >= tc.shardCount {
t.Errorf("shard %d out of range [0, %d)", shard, tc.shardCount)
}
}
})
}
}

View File

@@ -12,13 +12,24 @@ type ConsistentHashRing struct {
ring map[uint32]string // hash -> node ID
sortedHashes []uint32 // sorted hash keys
nodes map[string]bool // active nodes
virtualNodes int // number of virtual nodes per physical node
}
// NewConsistentHashRing creates a new consistent hash ring
// NewConsistentHashRing creates a new consistent hash ring with default configuration
func NewConsistentHashRing() *ConsistentHashRing {
return NewConsistentHashRingWithConfig(DefaultHashRingConfig())
}
// NewConsistentHashRingWithConfig creates a new consistent hash ring with custom configuration
func NewConsistentHashRingWithConfig(config HashRingConfig) *ConsistentHashRing {
virtualNodes := config.VirtualNodes
if virtualNodes == 0 {
virtualNodes = DefaultVirtualNodes
}
return &ConsistentHashRing{
ring: make(map[uint32]string),
nodes: make(map[string]bool),
virtualNodes: virtualNodes,
}
}
@@ -31,7 +42,7 @@ func (chr *ConsistentHashRing) AddNode(nodeID string) {
chr.nodes[nodeID] = true
// Add virtual nodes for better distribution
for i := 0; i < VirtualNodes; i++ {
for i := 0; i < chr.virtualNodes; i++ {
virtualKey := fmt.Sprintf("%s:%d", nodeID, i)
hash := chr.hash(virtualKey)
chr.ring[hash] = nodeID
@@ -103,3 +114,8 @@ func (chr *ConsistentHashRing) GetNodes() []string {
func (chr *ConsistentHashRing) IsEmpty() bool {
return len(chr.nodes) == 0
}
// GetVirtualNodes returns the number of virtual nodes per physical node
func (chr *ConsistentHashRing) GetVirtualNodes() int {
return chr.virtualNodes
}

View File

@@ -42,7 +42,7 @@ func TestAddNode(t *testing.T) {
}
// Verify virtual nodes were added
expectedVirtualNodes := VirtualNodes
expectedVirtualNodes := DefaultVirtualNodes
if len(ring.sortedHashes) != expectedVirtualNodes {
t.Errorf("expected %d virtual nodes, got %d", expectedVirtualNodes, len(ring.sortedHashes))
}
@@ -86,7 +86,7 @@ func TestAddNode_MultipleNodes(t *testing.T) {
t.Errorf("expected 3 nodes, got %d", len(nodes))
}
expectedHashes := VirtualNodes * 3
expectedHashes := DefaultVirtualNodes * 3
if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
}
@@ -118,7 +118,7 @@ func TestRemoveNode(t *testing.T) {
}
// Verify virtual nodes were removed
expectedHashes := VirtualNodes
expectedHashes := DefaultVirtualNodes
if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
}
@@ -321,7 +321,7 @@ func TestRingBehavior_ManyNodes(t *testing.T) {
}
// Verify virtual nodes count
expectedHashes := numNodes * VirtualNodes
expectedHashes := numNodes * DefaultVirtualNodes
if len(ring.sortedHashes) != expectedHashes {
t.Errorf("expected %d virtual nodes, got %d", expectedHashes, len(ring.sortedHashes))
}
@@ -355,7 +355,7 @@ func TestRingBehavior_ManyNodes(t *testing.T) {
}
}
func TestVirtualNodes_ImproveDistribution(t *testing.T) {
func TestDefaultVirtualNodes_ImproveDistribution(t *testing.T) {
// Test that virtual nodes actually improve distribution
// by comparing with a theoretical single-hash-per-node scenario
@@ -386,7 +386,7 @@ func TestVirtualNodes_ImproveDistribution(t *testing.T) {
stdDev := math.Sqrt(sumSquaredDiff / float64(numNodes))
coefficientOfVariation := stdDev / expectedPerNode
// With VirtualNodes=150, we expect good distribution
// With DefaultVirtualNodes=150, we expect good distribution
// Coefficient of variation should be low (< 15%)
if coefficientOfVariation > 0.15 {
t.Errorf("distribution has high coefficient of variation: %.2f%% (expected < 15%%)",
@@ -394,8 +394,8 @@ func TestVirtualNodes_ImproveDistribution(t *testing.T) {
}
// Verify that the actual number of virtual nodes matches expected
if len(ring.sortedHashes) != numNodes*VirtualNodes {
t.Errorf("expected %d virtual node hashes, got %d", numNodes*VirtualNodes, len(ring.sortedHashes))
if len(ring.sortedHashes) != numNodes*DefaultVirtualNodes {
t.Errorf("expected %d virtual node hashes, got %d", numNodes*DefaultVirtualNodes, len(ring.sortedHashes))
}
}

View File

@@ -33,8 +33,26 @@ type ShardManager struct {
replication int
}
// NewShardManager creates a new shard manager
// NewShardManager creates a new shard manager with default configuration
func NewShardManager(shardCount, replication int) *ShardManager {
return NewShardManagerWithConfig(ShardConfig{
ShardCount: shardCount,
ReplicationFactor: replication,
})
}
// NewShardManagerWithConfig creates a new shard manager with custom configuration
func NewShardManagerWithConfig(config ShardConfig) *ShardManager {
// Apply defaults for zero values
shardCount := config.ShardCount
if shardCount == 0 {
shardCount = DefaultNumShards
}
replication := config.ReplicationFactor
if replication == 0 {
replication = 1
}
return &ShardManager{
shardCount: shardCount,
shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)},
@@ -149,6 +167,15 @@ func (sm *ShardManager) GetActorsInShard(shardID int, nodeID string, vmRegistry
return actors
}
// GetShardCount returns the total number of shards
func (sm *ShardManager) GetShardCount() int {
return sm.shardCount
}
// GetReplicationFactor returns the replication factor
func (sm *ShardManager) GetReplicationFactor() int {
return sm.replication
}
// ConsistentHashPlacement implements PlacementStrategy using consistent hashing
type ConsistentHashPlacement struct{}

View File

@@ -4,17 +4,47 @@ import (
"time"
)
// Default configuration values
const (
// NumShards defines the total number of shards in the cluster
NumShards = 1024
// VirtualNodes defines the number of virtual nodes per physical node for consistent hashing
VirtualNodes = 150
// DefaultNumShards defines the default total number of shards in the cluster
DefaultNumShards = 1024
// DefaultVirtualNodes defines the default number of virtual nodes per physical node
DefaultVirtualNodes = 150
// Leadership election constants
LeaderLeaseTimeout = 10 * time.Second // How long a leader lease lasts
HeartbeatInterval = 3 * time.Second // How often leader sends heartbeats
ElectionTimeout = 2 * time.Second // How long to wait for election
)
// HashRingConfig holds configuration for the consistent hash ring
type HashRingConfig struct {
// VirtualNodes is the number of virtual nodes per physical node (default: 150)
VirtualNodes int
}
// DefaultHashRingConfig returns the default hash ring configuration
func DefaultHashRingConfig() HashRingConfig {
return HashRingConfig{
VirtualNodes: DefaultVirtualNodes,
}
}
// ShardConfig holds configuration for shard management
type ShardConfig struct {
// ShardCount is the total number of shards (default: 1024)
ShardCount int
// ReplicationFactor is the number of replicas per shard (default: 1)
ReplicationFactor int
}
// DefaultShardConfig returns the default shard configuration
func DefaultShardConfig() ShardConfig {
return ShardConfig{
ShardCount: DefaultNumShards,
ReplicationFactor: 1,
}
}
// NodeStatus represents the health status of a node
type NodeStatus string
@@ -107,4 +137,3 @@ type LeadershipLease struct {
ExpiresAt time.Time `json:"expiresAt"`
StartedAt time.Time `json:"startedAt"`
}

46
store/config_test.go Normal file
View File

@@ -0,0 +1,46 @@
package store
import (
"testing"
"time"
)
func TestDefaultJetStreamConfig(t *testing.T) {
config := DefaultJetStreamConfig()
if config.StreamRetention != DefaultStreamRetention {
t.Errorf("expected StreamRetention=%v, got %v", DefaultStreamRetention, config.StreamRetention)
}
if config.ReplicaCount != DefaultReplicaCount {
t.Errorf("expected ReplicaCount=%d, got %d", DefaultReplicaCount, config.ReplicaCount)
}
}
func TestJetStreamConfigDefaults(t *testing.T) {
t.Run("default stream retention is 1 year", func(t *testing.T) {
expected := 365 * 24 * time.Hour
if DefaultStreamRetention != expected {
t.Errorf("expected DefaultStreamRetention=%v, got %v", expected, DefaultStreamRetention)
}
})
t.Run("default replica count is 1", func(t *testing.T) {
if DefaultReplicaCount != 1 {
t.Errorf("expected DefaultReplicaCount=1, got %d", DefaultReplicaCount)
}
})
}
func TestJetStreamConfigCustomValues(t *testing.T) {
config := JetStreamConfig{
StreamRetention: 30 * 24 * time.Hour, // 30 days
ReplicaCount: 3,
}
if config.StreamRetention != 30*24*time.Hour {
t.Errorf("expected StreamRetention=30 days, got %v", config.StreamRetention)
}
if config.ReplicaCount != 3 {
t.Errorf("expected ReplicaCount=3, got %d", config.ReplicaCount)
}
}

View File

@@ -11,29 +11,65 @@ import (
"github.com/nats-io/nats.go"
)
// Default configuration values for JetStream event store
const (
DefaultStreamRetention = 365 * 24 * time.Hour // 1 year
DefaultReplicaCount = 1
)
// JetStreamConfig holds configuration options for JetStreamEventStore
type JetStreamConfig struct {
// StreamRetention is how long to keep events (default: 1 year)
StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int
}
// DefaultJetStreamConfig returns the default configuration
func DefaultJetStreamConfig() JetStreamConfig {
return JetStreamConfig{
StreamRetention: DefaultStreamRetention,
ReplicaCount: DefaultReplicaCount,
}
}
// JetStreamEventStore implements EventStore using NATS JetStream for persistence
type JetStreamEventStore struct {
js nats.JetStreamContext
streamName string
config JetStreamConfig
mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache
}
// NewJetStreamEventStore creates a new JetStream-based event store
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
}
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
js, err := natsConn.JetStream()
if err != nil {
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
}
// Apply defaults for zero values
if config.StreamRetention == 0 {
config.StreamRetention = DefaultStreamRetention
}
if config.ReplicaCount == 0 {
config.ReplicaCount = DefaultReplicaCount
}
// Create or update the stream
stream := &nats.StreamConfig{
Name: streamName,
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
Storage: nats.FileStorage,
Retention: nats.LimitsPolicy,
MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year
Replicas: 1, // Can be increased for HA
MaxAge: config.StreamRetention,
Replicas: config.ReplicaCount,
}
_, err = js.AddStream(stream)
@@ -44,6 +80,7 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
return &JetStreamEventStore{
js: js,
streamName: streamName,
config: config,
versions: make(map[string]int64),
}, nil
}