# DDD Patterns: Intended vs Actual Code This document shows side-by-side comparisons of how the Cluster Coordination context should evolve from its current state to proper DDD patterns. --- ## Pattern 1: Commands vs Message Handlers ### Current (Anemic) ```go // File: cluster/manager.go, line 141 func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) { var clusterMsg ClusterMessage if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil { cm.logger.Printf("Invalid cluster message: %v", err) return } switch clusterMsg.Type { case "rebalance": cm.handleRebalanceRequest(clusterMsg) case "migrate": cm.handleMigrationRequest(clusterMsg) case "node_update": if update, ok := clusterMsg.Payload.(NodeUpdate); ok { cm.handleNodeUpdate(update) } default: cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type) } } // File: cluster/manager.go, line 163 func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) { cm.mutex.Lock() defer cm.mutex.Unlock() switch update.Type { case NodeJoined: cm.nodes[update.Node.ID] = update.Node cm.hashRing.AddNode(update.Node.ID) cm.logger.Printf("Node joined: %s", update.Node.ID) // ... (more cases) } } ``` **Problems:** - Generic message dispatch; unclear intent - No explicit "command" concept - No validation before state change - No events published - Tightly coupled to NATS message format ### Intended (DDD) ```go // File: cluster/manager.go // JoinCluster adds a node to the cluster (command) func (cm *ClusterManager) JoinCluster(nodeInfo *NodeInfo) error { // Validate preconditions if nodeInfo.ID == "" { return fmt.Errorf("node ID cannot be empty") } if nodeInfo.Capacity <= 0 { return fmt.Errorf("node capacity must be > 0") } cm.mutex.Lock() defer cm.mutex.Unlock() // Check duplicate if _, exists := cm.nodes[nodeInfo.ID]; exists { return fmt.Errorf("node already exists: %s", nodeInfo.ID) } // Execute command cm.nodes[nodeInfo.ID] = nodeInfo cm.hashRing.AddNode(nodeInfo.ID) // Publish event event := &NodeJoined{ NodeID: nodeInfo.ID, Address: nodeInfo.Address, Port: nodeInfo.Port, Capacity: nodeInfo.Capacity, Timestamp: time.Now(), } if err := cm.eventPublisher.Publish(event); err != nil { cm.logger.Printf("Failed to publish NodeJoined event: %v", err) // Decide: rollback or proceed? (Usually proceed for CQRS) } // Trigger rebalancing if leader if cm.IsLeader() { go cm.triggerShardRebalancing("node_joined") } return nil } // MarkNodeFailed marks a node as failed (command) func (cm *ClusterManager) MarkNodeFailed(nodeID string, reason string) error { // Validate cm.mutex.Lock() node, exists := cm.nodes[nodeID] if !exists { cm.mutex.Unlock() return fmt.Errorf("node not found: %s", nodeID) } if node.Status == NodeStatusFailed { cm.mutex.Unlock() return fmt.Errorf("node already failed: %s", nodeID) } // Execute command node.Status = NodeStatusFailed cm.hashRing.RemoveNode(nodeID) // Publish event event := &NodeFailed{ NodeID: nodeID, Reason: reason, Timestamp: time.Now(), } if err := cm.eventPublisher.Publish(event); err != nil { cm.logger.Printf("Failed to publish NodeFailed event: %v", err) } isLeader := cm.IsLeader() cm.mutex.Unlock() // Trigger rebalancing if leader if isLeader { go cm.triggerShardRebalancing("node_failed") } return nil } ``` **Benefits:** - Explicit command methods with clear intent - Validation before execution - Atomic operations (lock held throughout) - Events published on success - Decoupled from message format - Testable in isolation --- ## Pattern 2: Value Objects vs Primitives ### Current (Scattered Types) ```go // File: cluster/types.go, line 58 type NodeInfo struct { ID string `json:"id"` Address string `json:"address"` Port int `json:"port"` Status NodeStatus `json:"status"` Capacity float64 `json:"capacity"` Load float64 `json:"load"` LastSeen time.Time `json:"lastSeen"` Timestamp time.Time `json:"timestamp"` Metadata map[string]string `json:"metadata"` IsLeader bool `json:"isLeader"` VMCount int `json:"vmCount"` ShardIDs []int `json:"shardIds"` } // No validation, no methods // Can create invalid nodes: node := &NodeInfo{ID: "", Capacity: -5} // Invalid! ``` ### Intended (Value Objects with Validation) ```go // File: cluster/domain.go (new file) // NodeID is a value object representing a unique node identifier type NodeID struct { id string } // NewNodeID creates a NodeID, validating it's non-empty func NewNodeID(id string) (NodeID, error) { if id == "" { return NodeID{}, errors.New("node ID cannot be empty") } return NodeID{id: id}, nil } // String returns the node ID as string func (n NodeID) String() string { return n.id } // Equal checks equality func (n NodeID) Equal(other NodeID) bool { return n.id == other.id } // Capacity is a value object representing node capacity type Capacity struct { value float64 } // NewCapacity creates a Capacity, validating it's positive func NewCapacity(value float64) (Capacity, error) { if value <= 0 { return Capacity{}, fmt.Errorf("capacity must be > 0, got %f", value) } return Capacity{value: value}, nil } // Value returns capacity as float64 func (c Capacity) Value() float64 { return c.value } // NodeInfo is a value object (immutable after creation) type NodeInfo struct { nodeID NodeID address string port int status NodeStatus capacity Capacity load float64 lastSeen time.Time metadata map[string]string isLeader bool vmCount int shardIDs []int } // NewNodeInfo creates a NodeInfo with validation func NewNodeInfo( id string, address string, port int, capacity float64, ) (*NodeInfo, error) { nodeID, err := NewNodeID(id) if err != nil { return nil, err } cap, err := NewCapacity(capacity) if err != nil { return nil, err } if port < 1 || port > 65535 { return nil, fmt.Errorf("invalid port: %d", port) } return &NodeInfo{ nodeID: nodeID, address: address, port: port, status: NodeStatusActive, capacity: cap, load: 0, lastSeen: time.Now(), metadata: make(map[string]string), isLeader: false, vmCount: 0, shardIDs: []int{}, }, nil } // Getters (all return copies to prevent mutation) func (n *NodeInfo) NodeID() NodeID { return n.nodeID } func (n *NodeInfo) Address() string { return n.address } func (n *NodeInfo) Port() int { return n.port } func (n *NodeInfo) Status() NodeStatus { return n.status } func (n *NodeInfo) Capacity() Capacity { return n.capacity } // WithStatus returns a new NodeInfo with updated status // (immutable pattern: create new instead of mutate) func (n *NodeInfo) WithStatus(status NodeStatus) *NodeInfo { copy := *n copy.status = status return © } // WithLastSeen returns a new NodeInfo with updated last seen time func (n *NodeInfo) WithLastSeen(t time.Time) *NodeInfo { copy := *n copy.lastSeen = t return © } // Equal checks value equality func (n *NodeInfo) Equal(other *NodeInfo) bool { if other == nil { return false } return n.nodeID.Equal(other.nodeID) && n.address == other.address && n.port == other.port && n.status == other.status && n.capacity == other.capacity } ``` **Benefits:** - Impossible to create invalid nodes (caught at construction) - Type safety (can't accidentally pass negative capacity) - Immutable (prevents accidental mutations) - Methods encapsulate behavior - Easy to extend validation - Copy-on-write pattern for updates --- ## Pattern 3: Event Publishing ### Current (No Events) ```go // File: cluster/manager.go func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) { cm.mutex.Lock() defer cm.mutex.Unlock() switch update.Type { case NodeJoined: cm.nodes[update.Node.ID] = update.Node cm.hashRing.AddNode(update.Node.ID) cm.logger.Printf("Node joined: %s", update.Node.ID) // No event published // No way for other contexts to react // No audit trail } } ``` ### Intended (Events as First-Class) ```go // File: cluster/events.go (new file) // NodeJoined event indicates a node joined the cluster type NodeJoined struct { NodeID NodeID Address string Port int Capacity Capacity Timestamp time.Time } // NodeFailed event indicates a node failed type NodeFailed struct { NodeID NodeID Reason string // "HeartbeatTimeout", "AdminMarked", etc. Timestamp time.Time } // ShardAssigned event indicates shard assignment changed type ShardAssigned struct { ShardID int NodeIDs []NodeID // [primary, replica1, replica2, ...] Version uint64 Timestamp time.Time } // ShardMigrated event indicates a shard moved from one node to another type ShardMigrated struct { ShardID int FromNodes []NodeID ToNodes []NodeID Timestamp time.Time } // RebalancingTriggered event indicates rebalancing started type RebalancingTriggered struct { LeaderID NodeID Reason string // "node_joined", "node_failed", "manual", "periodic" Timestamp time.Time } // EventPublisher is the interface for publishing domain events type EventPublisher interface { // Publish publishes an event Publish(ctx context.Context, event interface{}) error } // File: cluster/manager.go (updated) type ClusterManager struct { // ... existing fields ... eventPublisher EventPublisher // NEW } // publishEvent is a helper to publish events consistently func (cm *ClusterManager) publishEvent(ctx context.Context, event interface{}) error { if cm.eventPublisher == nil { return nil // No-op if no publisher configured } return cm.eventPublisher.Publish(ctx, event) } // JoinCluster adds a node and publishes NodeJoined event func (cm *ClusterManager) JoinCluster(ctx context.Context, nodeInfo *NodeInfo) error { // Validation... cm.mutex.Lock() cm.nodes[nodeInfo.NodeID().String()] = nodeInfo cm.hashRing.AddNode(nodeInfo.NodeID().String()) cm.mutex.Unlock() // Publish event event := &NodeJoined{ NodeID: nodeInfo.NodeID(), Address: nodeInfo.Address(), Port: nodeInfo.Port(), Capacity: nodeInfo.Capacity(), Timestamp: time.Now(), } return cm.publishEvent(ctx, event) } // MarkNodeFailed marks node as failed and publishes NodeFailed event func (cm *ClusterManager) MarkNodeFailed(ctx context.Context, nodeID NodeID, reason string) error { // Validation... cm.mutex.Lock() cm.nodes[nodeID.String()].WithStatus(NodeStatusFailed) cm.hashRing.RemoveNode(nodeID.String()) cm.mutex.Unlock() // Publish event event := &NodeFailed{ NodeID: nodeID, Reason: reason, Timestamp: time.Now(), } return cm.publishEvent(ctx, event) } ``` **Benefits:** - Events are explicit domain concepts - Type-safe (compiler enforces event structure) - Published consistently (via publishEvent helper) - Other contexts can subscribe and react - Full audit trail available - Enables event sourcing / CQRS --- ## Pattern 4: Invariant Validation ### Current (Validation Scattered) ```go // File: cluster/manager.go, line 191-197 func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) { // ... now := time.Now() for _, node := range cm.nodes { if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed { node.Status = NodeStatusFailed cm.logger.Printf("Node marked as failed: %s", node.ID) } } } // File: cluster/manager.go, line 276-288 func (cm *ClusterManager) checkNodeHealth() { cm.mutex.Lock() defer cm.mutex.Unlock() now := time.Now() for _, node := range cm.nodes { if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive { node.Status = NodeStatusFailed cm.logger.Printf("Node failed: %s", node.ID) } } } // Duplicate logic! Easy to miss cases. // No central validation. ``` ### Intended (Centralized Invariants) ```go // File: cluster/invariants.go (new file) // ClusterInvariants defines the consistency rules for the cluster type ClusterInvariants struct { shardCount int } // NewClusterInvariants creates an invariant validator func NewClusterInvariants(shardCount int) *ClusterInvariants { return &ClusterInvariants{shardCount: shardCount} } // ValidateNodeHealth checks Invariant 5: Leader is active func (i *ClusterInvariants) ValidateNodeHealth(nodes map[string]*NodeInfo, leaderID string) error { if leaderID == "" { return nil // No leader yet, OK } leaderNode, exists := nodes[leaderID] if !exists { return fmt.Errorf("leader node %s not in cluster", leaderID) } if leaderNode.Status() != NodeStatusActive { return fmt.Errorf("leader node %s is not active (status: %v)", leaderID, leaderNode.Status()) } return nil } // ValidateShardCoverage checks Invariant 2: All shards assigned func (i *ClusterInvariants) ValidateShardCoverage(shardMap *ShardMap) error { if shardMap == nil { return errors.New("shard map is nil") } assignedShards := make(map[int]bool) for shardID := range shardMap.Shards { assignedShards[shardID] = true } for shardID := 0; shardID < i.shardCount; shardID++ { if !assignedShards[shardID] { return fmt.Errorf("shard %d is not assigned (orphaned)", shardID) } } return nil } // ValidateShardOwnership checks Invariant 3: Only healthy nodes own shards func (i *ClusterInvariants) ValidateShardOwnership(shardMap *ShardMap) error { if shardMap == nil { return errors.New("shard map is nil") } for shardID, nodeIDs := range shardMap.Shards { for _, nodeID := range nodeIDs { nodeInfo, exists := shardMap.Nodes[nodeID.String()] if !exists { return fmt.Errorf("shard %d assigned to unknown node %s", shardID, nodeID) } if nodeInfo.Status() != NodeStatusActive { return fmt.Errorf("shard %d assigned to unhealthy node %s (status: %v)", shardID, nodeID, nodeInfo.Status()) } } } return nil } // ValidateAll runs all invariant checks func (i *ClusterInvariants) ValidateAll(topology *ClusterTopology) error { if err := i.ValidateNodeHealth(topology.nodes, topology.leaderID); err != nil { return fmt.Errorf("invariant violation (I5): %w", err) } if err := i.ValidateShardCoverage(topology.shardMap); err != nil { return fmt.Errorf("invariant violation (I2): %w", err) } if err := i.ValidateShardOwnership(topology.shardMap); err != nil { return fmt.Errorf("invariant violation (I3): %w", err) } return nil } // File: cluster/manager.go (updated) type ClusterManager struct { // ... existing fields ... invariants *ClusterInvariants // NEW } // MarkNodeFailed marks node as failed with invariant checks func (cm *ClusterManager) MarkNodeFailed(ctx context.Context, nodeID NodeID, reason string) error { cm.mutex.Lock() defer cm.mutex.Unlock() // Validate preconditions node, exists := cm.nodes[nodeID.String()] if !exists { return fmt.Errorf("node not found: %s", nodeID) } if node.Status() == NodeStatusFailed { return fmt.Errorf("node already failed: %s", nodeID) } // Execute command failedNode := node.WithStatus(NodeStatusFailed) cm.nodes[nodeID.String()] = failedNode cm.hashRing.RemoveNode(nodeID.String()) // Validate invariants still hold if err := cm.invariants.ValidateNodeHealth(cm.nodes, cm.currentLeaderID); err != nil { return fmt.Errorf("invariant violation after node failure: %w", err) } // Publish event event := &NodeFailed{ NodeID: nodeID, Reason: reason, Timestamp: time.Now(), } _ = cm.publishEvent(ctx, event) return nil } // AssignShards assigns shards with invariant validation func (cm *ClusterManager) AssignShards(ctx context.Context, newShardMap *ShardMap) error { // Only leader can assign if !cm.IsLeader() { return errors.New("only leader can assign shards") } cm.mutex.Lock() defer cm.mutex.Unlock() // Validate preconditions if err := cm.invariants.ValidateAll(&ClusterTopology{ nodes: cm.nodes, shardMap: newShardMap, leaderID: cm.currentLeaderID, }); err != nil { return fmt.Errorf("cannot assign shards: %w", err) } // Execute command oldShardMap := cm.shardMap cm.shardMap = newShardMap // Publish events for shardID, newNodes := range newShardMap.Shards { oldNodes := oldShardMap.Shards[shardID] if !nodeListEqual(oldNodes, newNodes) { event := &ShardMigrated{ ShardID: shardID, FromNodes: oldNodes, ToNodes: newNodes, Timestamp: time.Now(), } _ = cm.publishEvent(ctx, event) } } return nil } ``` **Benefits:** - Invariants defined in one place - Easy to audit what's being validated - Consistent application across all commands - Clear error messages - Testable in isolation - Easy to add new invariants --- ## Pattern 5: Rebalancing Strategy ### Current (Stubbed) ```go // File: cluster/shard.go, line 210 func (chp *ConsistentHashPlacement) RebalanceShards( currentMap *ShardMap, nodes map[string]*NodeInfo, ) (*ShardMap, error) { // This is a simplified implementation // In practice, this would implement sophisticated rebalancing logic return currentMap, nil // BUG: Returns unchanged! } ``` ### Intended (Real Implementation) ```go // File: cluster/rebalancing.go (new file) // RebalancingStrategy defines how to distribute shards across nodes type RebalancingStrategy interface { // Rebalance computes new shard assignments // Returns new ShardMap or error if unable to rebalance Rebalance( current *ShardMap, activeNodes map[string]*NodeInfo, ) (*ShardMap, error) } // ConsistentHashRebalancer uses consistent hashing to minimize movements type ConsistentHashRebalancer struct { virtualNodes int shardCount int } // NewConsistentHashRebalancer creates a rebalancer func NewConsistentHashRebalancer(virtualNodes, shardCount int) *ConsistentHashRebalancer { return &ConsistentHashRebalancer{ virtualNodes: virtualNodes, shardCount: shardCount, } } // Rebalance computes new assignments using consistent hashing func (chr *ConsistentHashRebalancer) Rebalance( current *ShardMap, activeNodes map[string]*NodeInfo, ) (*ShardMap, error) { if len(activeNodes) == 0 { return nil, errors.New("no active nodes to rebalance to") } // Build new hash ring from active nodes ring := NewConsistentHashRingWithConfig(HashRingConfig{ VirtualNodes: chr.virtualNodes, }) for nodeID := range activeNodes { ring.AddNode(nodeID) } // Reassign each shard via consistent hash newAssignments := make(map[int][]string) for shardID := 0; shardID < chr.shardCount; shardID++ { // Primary node via consistent hash primaryNode := ring.GetNode(fmt.Sprintf("shard-%d", shardID)) if primaryNode == "" { return nil, fmt.Errorf("no node assigned for shard %d", shardID) } // TODO: Add replicas (for now: single replica) newAssignments[shardID] = []string{primaryNode} } return &ShardMap{ Version: current.Version + 1, Shards: newAssignments, Nodes: activeNodes, UpdateTime: time.Now(), }, nil } // LoadBalancingRebalancer assigns based on current load (future strategy) type LoadBalancingRebalancer struct { shardCount int } // Rebalance assigns shards to least-loaded nodes func (lbr *LoadBalancingRebalancer) Rebalance( current *ShardMap, activeNodes map[string]*NodeInfo, ) (*ShardMap, error) { // Sort nodes by load type nodeLoad struct { id string load float64 } var nodes []nodeLoad for id, node := range activeNodes { nodes = append(nodes, nodeLoad{id, node.Load}) } sort.Slice(nodes, func(i, j int) bool { return nodes[i].load < nodes[j].load }) // Assign each shard to least-loaded node newAssignments := make(map[int][]string) for shardID := 0; shardID < lbr.shardCount; shardID++ { // Round-robin through sorted nodes idx := shardID % len(nodes) newAssignments[shardID] = []string{nodes[idx].id} } return &ShardMap{ Version: current.Version + 1, Shards: newAssignments, Nodes: activeNodes, UpdateTime: time.Now(), }, nil } // File: cluster/manager.go (updated) // RebalanceShards coordinates rebalancing func (cm *ClusterManager) RebalanceShards(ctx context.Context, reason string) error { if !cm.IsLeader() { return errors.New("only leader can rebalance") } cm.mutex.Lock() // Get active nodes activeNodes := make(map[string]*NodeInfo) for id, node := range cm.nodes { if node.Status() == NodeStatusActive { activeNodes[id] = node } } if len(activeNodes) == 0 { cm.mutex.Unlock() return errors.New("no active nodes to rebalance to") } // Publish rebalancing started startEvent := &RebalancingTriggered{ LeaderID: NodeID{id: cm.currentLeaderID}, Reason: reason, Timestamp: time.Now(), } _ = cm.publishEvent(ctx, startEvent) // Compute new assignments strategy := NewConsistentHashRebalancer(DefaultVirtualNodes, DefaultNumShards) newShardMap, err := strategy.Rebalance(cm.shardMap, activeNodes) if err != nil { cm.mutex.Unlock() return fmt.Errorf("rebalancing strategy failed: %w", err) } // Validate new assignments if err := cm.invariants.ValidateAll(&ClusterTopology{ nodes: cm.nodes, shardMap: newShardMap, leaderID: cm.currentLeaderID, }); err != nil { cm.mutex.Unlock() return fmt.Errorf("new shard map violates invariants: %w", err) } // Apply new assignments oldShardMap := cm.shardMap cm.shardMap = newShardMap migratedCount := 0 for shardID, newNodes := range newShardMap.Shards { oldNodes := oldShardMap.Shards[shardID] if !nodeListEqual(oldNodes, newNodes) { migratedCount++ // Publish event for each migration event := &ShardMigrated{ ShardID: shardID, FromNodes: stringListToNodeIDList(oldNodes), ToNodes: stringListToNodeIDList(newNodes), Timestamp: time.Now(), } _ = cm.publishEvent(ctx, event) } } cm.mutex.Unlock() // Publish rebalancing completed completeEvent := &RebalancingCompleted{ LeaderID: NodeID{id: cm.currentLeaderID}, MigratedCount: migratedCount, CompletedAt: time.Now(), } _ = cm.publishEvent(ctx, completeEvent) return nil } ``` **Benefits:** - Strategy pattern allows multiple algorithms - Real rebalancing actually redistributes shards - New strategies can be plugged in (e.g., load-aware) - Invariants checked before applying - Events published for observability - Testable in isolation --- ## Pattern 6: Testing Aggregates ### Current (Hard to Test) ```go // Testing is difficult because: // 1. No dependency injection (NATS, KV store hardcoded) // 2. No way to verify events (none published) // 3. No way to inject clock (time.Now() hardcoded) // 4. All state is private; hard to assert func TestClusterManager_JoinNode(t *testing.T) { // Can't create without real NATS connection! natsConn, _ := nats.Connect(nats.DefaultURL) defer natsConn.Close() ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) cm, _ := NewClusterManager("node-1", natsConn, ctx) // Can't control time // Can't verify events // Can't assert invariants } ``` ### Intended (Testable with Mocks) ```go // File: cluster/manager_test.go // MockEventPublisher captures published events for testing type MockEventPublisher struct { events []interface{} mu sync.Mutex } func (m *MockEventPublisher) Publish(ctx context.Context, event interface{}) error { m.mu.Lock() defer m.mu.Unlock() m.events = append(m.events, event) return nil } func (m *MockEventPublisher) GetEvents(t *testing.T) []interface{} { m.mu.Lock() defer m.mu.Unlock() return m.events } func (m *MockEventPublisher) Clear() { m.mu.Lock() defer m.mu.Unlock() m.events = []interface{}{} } // MockClock allows controlling time in tests type MockClock struct { now time.Time } func (mc *MockClock) Now() time.Time { return mc.now } func (mc *MockClock) Advance(d time.Duration) { mc.now = mc.now.Add(d) } // ClusterManagerWithClock allows injecting a clock type ClusterManager struct { // ... existing fields ... clock Clock // NEW } type Clock interface { Now() time.Time } // Test: JoinCluster publishes NodeJoined event func TestClusterManager_JoinCluster_PublishesEvent(t *testing.T) { // Arrange publisher := &MockEventPublisher{} cm := &ClusterManager{ nodes: make(map[string]*NodeInfo), hashRing: NewConsistentHashRing(), eventPublisher: publisher, invariants: NewClusterInvariants(1024), } nodeInfo, _ := NewNodeInfo("node-1", "localhost", 8080, 1000) // Act ctx := context.Background() err := cm.JoinCluster(ctx, nodeInfo) // Assert if err != nil { t.Fatalf("JoinCluster failed: %v", err) } events := publisher.GetEvents(t) if len(events) != 1 { t.Fatalf("expected 1 event, got %d", len(events)) } joinedEvent, ok := events[0].(*NodeJoined) if !ok { t.Fatalf("expected NodeJoined event, got %T", events[0]) } if joinedEvent.NodeID.String() != "node-1" { t.Errorf("expected node-1, got %s", joinedEvent.NodeID) } } // Test: MarkNodeFailed with invariant violation func TestClusterManager_MarkNodeFailed_ValidatesInvariants(t *testing.T) { // Arrange publisher := &MockEventPublisher{} cm := &ClusterManager{ nodes: make(map[string]*NodeInfo), hashRing: NewConsistentHashRing(), eventPublisher: publisher, currentLeaderID: "node-1", invariants: NewClusterInvariants(1024), } // Only one node: the leader node1, _ := NewNodeInfo("node-1", "localhost", 8080, 1000) cm.nodes["node-1"] = node1 // Act: Try to fail the only (leader) node ctx := context.Background() nodeID, _ := NewNodeID("node-1") err := cm.MarkNodeFailed(ctx, nodeID, "test") // Assert: Should fail because it violates Invariant 5 (leader must be active) if err == nil { t.Fatal("expected error when failing leader, got nil") } if !strings.Contains(err.Error(), "invariant") { t.Errorf("expected invariant error, got: %v", err) } } // Test: Rebalance uses strategy to compute assignments func TestClusterManager_RebalanceShards_UsesStrategy(t *testing.T) { // Arrange publisher := &MockEventPublisher{} cm := &ClusterManager{ nodes: make(map[string]*NodeInfo), hashRing: NewConsistentHashRing(), shardMap: &ShardMap{Shards: make(map[int][]string)}, currentLeaderID: "node-1", eventPublisher: publisher, invariants: NewClusterInvariants(10), // 10 shards for test } // Add nodes for i := 1; i <= 2; i++ { id := fmt.Sprintf("node-%d", i) node, _ := NewNodeInfo(id, "localhost", 8080+i, 1000) cm.nodes[id] = node cm.hashRing.AddNode(id) } // Act: Rebalance ctx := context.Background() err := cm.RebalanceShards(ctx, "test") // Assert if err != nil { t.Fatalf("RebalanceShards failed: %v", err) } // Check that shards are now assigned assignedCount := len(cm.shardMap.Shards) if assignedCount != 10 { t.Errorf("expected 10 shards assigned, got %d", assignedCount) } // Check that events were published events := publisher.GetEvents(t) hasShardMigrated := false for _, event := range events { if _, ok := event.(*ShardMigrated); ok { hasShardMigrated = true break } } if !hasShardMigrated { t.Error("expected at least one ShardMigrated event") } } ``` **Benefits:** - Dependency injection (publisher, clock, strategy) - Easy to verify events - Can test invariant validation - Can test without NATS - Clear, maintainable tests - Behavior-focused (what happened, not how) --- ## Summary: Key Patterns to Adopt | Pattern | Current | Intended | Benefit | |---------|---------|----------|---------| | Commands | Message handlers | Explicit methods | Clear intent | | Events | None published | First-class domain events | Event-driven, auditable | | Validation | Scattered | Centralized invariants | Consistent, testable | | Immutability | Mutable state | Value objects, copy-on-write | Prevents bugs | | Strategy | Stubbed | Real implementation | Actually works | | Testing | Hard (coupled) | Dependency injection, mocks | Easy, comprehensive | --- ## References - [DOMAIN_MODEL.md](./DOMAIN_MODEL.md) - Full domain model - [REFACTORING_SUMMARY.md](./REFACTORING_SUMMARY.md) - Implementation roadmap - [manager.go](./manager.go) - Current implementation - [leader.go](./leader.go) - LeaderElection implementation