Organize all product strategy and domain modeling documentation into a dedicated .product-strategy directory for better separation from code. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1120 lines
30 KiB
Markdown
1120 lines
30 KiB
Markdown
# DDD Patterns: Intended vs Actual Code
|
|
|
|
This document shows side-by-side comparisons of how the Cluster Coordination context should evolve from its current state to proper DDD patterns.
|
|
|
|
---
|
|
|
|
## Pattern 1: Commands vs Message Handlers
|
|
|
|
### Current (Anemic)
|
|
|
|
```go
|
|
// File: cluster/manager.go, line 141
|
|
func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) {
|
|
var clusterMsg ClusterMessage
|
|
if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil {
|
|
cm.logger.Printf("Invalid cluster message: %v", err)
|
|
return
|
|
}
|
|
|
|
switch clusterMsg.Type {
|
|
case "rebalance":
|
|
cm.handleRebalanceRequest(clusterMsg)
|
|
case "migrate":
|
|
cm.handleMigrationRequest(clusterMsg)
|
|
case "node_update":
|
|
if update, ok := clusterMsg.Payload.(NodeUpdate); ok {
|
|
cm.handleNodeUpdate(update)
|
|
}
|
|
default:
|
|
cm.logger.Printf("Unknown cluster message type: %s", clusterMsg.Type)
|
|
}
|
|
}
|
|
|
|
// File: cluster/manager.go, line 163
|
|
func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
|
cm.mutex.Lock()
|
|
defer cm.mutex.Unlock()
|
|
|
|
switch update.Type {
|
|
case NodeJoined:
|
|
cm.nodes[update.Node.ID] = update.Node
|
|
cm.hashRing.AddNode(update.Node.ID)
|
|
cm.logger.Printf("Node joined: %s", update.Node.ID)
|
|
// ... (more cases)
|
|
}
|
|
}
|
|
```
|
|
|
|
**Problems:**
|
|
- Generic message dispatch; unclear intent
|
|
- No explicit "command" concept
|
|
- No validation before state change
|
|
- No events published
|
|
- Tightly coupled to NATS message format
|
|
|
|
### Intended (DDD)
|
|
|
|
```go
|
|
// File: cluster/manager.go
|
|
|
|
// JoinCluster adds a node to the cluster (command)
|
|
func (cm *ClusterManager) JoinCluster(nodeInfo *NodeInfo) error {
|
|
// Validate preconditions
|
|
if nodeInfo.ID == "" {
|
|
return fmt.Errorf("node ID cannot be empty")
|
|
}
|
|
if nodeInfo.Capacity <= 0 {
|
|
return fmt.Errorf("node capacity must be > 0")
|
|
}
|
|
|
|
cm.mutex.Lock()
|
|
defer cm.mutex.Unlock()
|
|
|
|
// Check duplicate
|
|
if _, exists := cm.nodes[nodeInfo.ID]; exists {
|
|
return fmt.Errorf("node already exists: %s", nodeInfo.ID)
|
|
}
|
|
|
|
// Execute command
|
|
cm.nodes[nodeInfo.ID] = nodeInfo
|
|
cm.hashRing.AddNode(nodeInfo.ID)
|
|
|
|
// Publish event
|
|
event := &NodeJoined{
|
|
NodeID: nodeInfo.ID,
|
|
Address: nodeInfo.Address,
|
|
Port: nodeInfo.Port,
|
|
Capacity: nodeInfo.Capacity,
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := cm.eventPublisher.Publish(event); err != nil {
|
|
cm.logger.Printf("Failed to publish NodeJoined event: %v", err)
|
|
// Decide: rollback or proceed? (Usually proceed for CQRS)
|
|
}
|
|
|
|
// Trigger rebalancing if leader
|
|
if cm.IsLeader() {
|
|
go cm.triggerShardRebalancing("node_joined")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// MarkNodeFailed marks a node as failed (command)
|
|
func (cm *ClusterManager) MarkNodeFailed(nodeID string, reason string) error {
|
|
// Validate
|
|
cm.mutex.Lock()
|
|
node, exists := cm.nodes[nodeID]
|
|
if !exists {
|
|
cm.mutex.Unlock()
|
|
return fmt.Errorf("node not found: %s", nodeID)
|
|
}
|
|
if node.Status == NodeStatusFailed {
|
|
cm.mutex.Unlock()
|
|
return fmt.Errorf("node already failed: %s", nodeID)
|
|
}
|
|
|
|
// Execute command
|
|
node.Status = NodeStatusFailed
|
|
cm.hashRing.RemoveNode(nodeID)
|
|
|
|
// Publish event
|
|
event := &NodeFailed{
|
|
NodeID: nodeID,
|
|
Reason: reason,
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := cm.eventPublisher.Publish(event); err != nil {
|
|
cm.logger.Printf("Failed to publish NodeFailed event: %v", err)
|
|
}
|
|
|
|
isLeader := cm.IsLeader()
|
|
cm.mutex.Unlock()
|
|
|
|
// Trigger rebalancing if leader
|
|
if isLeader {
|
|
go cm.triggerShardRebalancing("node_failed")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
**Benefits:**
|
|
- Explicit command methods with clear intent
|
|
- Validation before execution
|
|
- Atomic operations (lock held throughout)
|
|
- Events published on success
|
|
- Decoupled from message format
|
|
- Testable in isolation
|
|
|
|
---
|
|
|
|
## Pattern 2: Value Objects vs Primitives
|
|
|
|
### Current (Scattered Types)
|
|
|
|
```go
|
|
// File: cluster/types.go, line 58
|
|
type NodeInfo struct {
|
|
ID string `json:"id"`
|
|
Address string `json:"address"`
|
|
Port int `json:"port"`
|
|
Status NodeStatus `json:"status"`
|
|
Capacity float64 `json:"capacity"`
|
|
Load float64 `json:"load"`
|
|
LastSeen time.Time `json:"lastSeen"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Metadata map[string]string `json:"metadata"`
|
|
IsLeader bool `json:"isLeader"`
|
|
VMCount int `json:"vmCount"`
|
|
ShardIDs []int `json:"shardIds"`
|
|
}
|
|
|
|
// No validation, no methods
|
|
// Can create invalid nodes:
|
|
node := &NodeInfo{ID: "", Capacity: -5} // Invalid!
|
|
```
|
|
|
|
### Intended (Value Objects with Validation)
|
|
|
|
```go
|
|
// File: cluster/domain.go (new file)
|
|
|
|
// NodeID is a value object representing a unique node identifier
|
|
type NodeID struct {
|
|
id string
|
|
}
|
|
|
|
// NewNodeID creates a NodeID, validating it's non-empty
|
|
func NewNodeID(id string) (NodeID, error) {
|
|
if id == "" {
|
|
return NodeID{}, errors.New("node ID cannot be empty")
|
|
}
|
|
return NodeID{id: id}, nil
|
|
}
|
|
|
|
// String returns the node ID as string
|
|
func (n NodeID) String() string {
|
|
return n.id
|
|
}
|
|
|
|
// Equal checks equality
|
|
func (n NodeID) Equal(other NodeID) bool {
|
|
return n.id == other.id
|
|
}
|
|
|
|
// Capacity is a value object representing node capacity
|
|
type Capacity struct {
|
|
value float64
|
|
}
|
|
|
|
// NewCapacity creates a Capacity, validating it's positive
|
|
func NewCapacity(value float64) (Capacity, error) {
|
|
if value <= 0 {
|
|
return Capacity{}, fmt.Errorf("capacity must be > 0, got %f", value)
|
|
}
|
|
return Capacity{value: value}, nil
|
|
}
|
|
|
|
// Value returns capacity as float64
|
|
func (c Capacity) Value() float64 {
|
|
return c.value
|
|
}
|
|
|
|
// NodeInfo is a value object (immutable after creation)
|
|
type NodeInfo struct {
|
|
nodeID NodeID
|
|
address string
|
|
port int
|
|
status NodeStatus
|
|
capacity Capacity
|
|
load float64
|
|
lastSeen time.Time
|
|
metadata map[string]string
|
|
isLeader bool
|
|
vmCount int
|
|
shardIDs []int
|
|
}
|
|
|
|
// NewNodeInfo creates a NodeInfo with validation
|
|
func NewNodeInfo(
|
|
id string,
|
|
address string,
|
|
port int,
|
|
capacity float64,
|
|
) (*NodeInfo, error) {
|
|
nodeID, err := NewNodeID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cap, err := NewCapacity(capacity)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if port < 1 || port > 65535 {
|
|
return nil, fmt.Errorf("invalid port: %d", port)
|
|
}
|
|
|
|
return &NodeInfo{
|
|
nodeID: nodeID,
|
|
address: address,
|
|
port: port,
|
|
status: NodeStatusActive,
|
|
capacity: cap,
|
|
load: 0,
|
|
lastSeen: time.Now(),
|
|
metadata: make(map[string]string),
|
|
isLeader: false,
|
|
vmCount: 0,
|
|
shardIDs: []int{},
|
|
}, nil
|
|
}
|
|
|
|
// Getters (all return copies to prevent mutation)
|
|
func (n *NodeInfo) NodeID() NodeID { return n.nodeID }
|
|
func (n *NodeInfo) Address() string { return n.address }
|
|
func (n *NodeInfo) Port() int { return n.port }
|
|
func (n *NodeInfo) Status() NodeStatus { return n.status }
|
|
func (n *NodeInfo) Capacity() Capacity { return n.capacity }
|
|
|
|
// WithStatus returns a new NodeInfo with updated status
|
|
// (immutable pattern: create new instead of mutate)
|
|
func (n *NodeInfo) WithStatus(status NodeStatus) *NodeInfo {
|
|
copy := *n
|
|
copy.status = status
|
|
return ©
|
|
}
|
|
|
|
// WithLastSeen returns a new NodeInfo with updated last seen time
|
|
func (n *NodeInfo) WithLastSeen(t time.Time) *NodeInfo {
|
|
copy := *n
|
|
copy.lastSeen = t
|
|
return ©
|
|
}
|
|
|
|
// Equal checks value equality
|
|
func (n *NodeInfo) Equal(other *NodeInfo) bool {
|
|
if other == nil {
|
|
return false
|
|
}
|
|
return n.nodeID.Equal(other.nodeID) &&
|
|
n.address == other.address &&
|
|
n.port == other.port &&
|
|
n.status == other.status &&
|
|
n.capacity == other.capacity
|
|
}
|
|
```
|
|
|
|
**Benefits:**
|
|
- Impossible to create invalid nodes (caught at construction)
|
|
- Type safety (can't accidentally pass negative capacity)
|
|
- Immutable (prevents accidental mutations)
|
|
- Methods encapsulate behavior
|
|
- Easy to extend validation
|
|
- Copy-on-write pattern for updates
|
|
|
|
---
|
|
|
|
## Pattern 3: Event Publishing
|
|
|
|
### Current (No Events)
|
|
|
|
```go
|
|
// File: cluster/manager.go
|
|
func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
|
cm.mutex.Lock()
|
|
defer cm.mutex.Unlock()
|
|
|
|
switch update.Type {
|
|
case NodeJoined:
|
|
cm.nodes[update.Node.ID] = update.Node
|
|
cm.hashRing.AddNode(update.Node.ID)
|
|
cm.logger.Printf("Node joined: %s", update.Node.ID)
|
|
// No event published
|
|
// No way for other contexts to react
|
|
// No audit trail
|
|
}
|
|
}
|
|
```
|
|
|
|
### Intended (Events as First-Class)
|
|
|
|
```go
|
|
// File: cluster/events.go (new file)
|
|
|
|
// NodeJoined event indicates a node joined the cluster
|
|
type NodeJoined struct {
|
|
NodeID NodeID
|
|
Address string
|
|
Port int
|
|
Capacity Capacity
|
|
Timestamp time.Time
|
|
}
|
|
|
|
// NodeFailed event indicates a node failed
|
|
type NodeFailed struct {
|
|
NodeID NodeID
|
|
Reason string // "HeartbeatTimeout", "AdminMarked", etc.
|
|
Timestamp time.Time
|
|
}
|
|
|
|
// ShardAssigned event indicates shard assignment changed
|
|
type ShardAssigned struct {
|
|
ShardID int
|
|
NodeIDs []NodeID // [primary, replica1, replica2, ...]
|
|
Version uint64
|
|
Timestamp time.Time
|
|
}
|
|
|
|
// ShardMigrated event indicates a shard moved from one node to another
|
|
type ShardMigrated struct {
|
|
ShardID int
|
|
FromNodes []NodeID
|
|
ToNodes []NodeID
|
|
Timestamp time.Time
|
|
}
|
|
|
|
// RebalancingTriggered event indicates rebalancing started
|
|
type RebalancingTriggered struct {
|
|
LeaderID NodeID
|
|
Reason string // "node_joined", "node_failed", "manual", "periodic"
|
|
Timestamp time.Time
|
|
}
|
|
|
|
// EventPublisher is the interface for publishing domain events
|
|
type EventPublisher interface {
|
|
// Publish publishes an event
|
|
Publish(ctx context.Context, event interface{}) error
|
|
}
|
|
|
|
// File: cluster/manager.go (updated)
|
|
|
|
type ClusterManager struct {
|
|
// ... existing fields ...
|
|
eventPublisher EventPublisher // NEW
|
|
}
|
|
|
|
// publishEvent is a helper to publish events consistently
|
|
func (cm *ClusterManager) publishEvent(ctx context.Context, event interface{}) error {
|
|
if cm.eventPublisher == nil {
|
|
return nil // No-op if no publisher configured
|
|
}
|
|
return cm.eventPublisher.Publish(ctx, event)
|
|
}
|
|
|
|
// JoinCluster adds a node and publishes NodeJoined event
|
|
func (cm *ClusterManager) JoinCluster(ctx context.Context, nodeInfo *NodeInfo) error {
|
|
// Validation...
|
|
|
|
cm.mutex.Lock()
|
|
cm.nodes[nodeInfo.NodeID().String()] = nodeInfo
|
|
cm.hashRing.AddNode(nodeInfo.NodeID().String())
|
|
cm.mutex.Unlock()
|
|
|
|
// Publish event
|
|
event := &NodeJoined{
|
|
NodeID: nodeInfo.NodeID(),
|
|
Address: nodeInfo.Address(),
|
|
Port: nodeInfo.Port(),
|
|
Capacity: nodeInfo.Capacity(),
|
|
Timestamp: time.Now(),
|
|
}
|
|
return cm.publishEvent(ctx, event)
|
|
}
|
|
|
|
// MarkNodeFailed marks node as failed and publishes NodeFailed event
|
|
func (cm *ClusterManager) MarkNodeFailed(ctx context.Context, nodeID NodeID, reason string) error {
|
|
// Validation...
|
|
|
|
cm.mutex.Lock()
|
|
cm.nodes[nodeID.String()].WithStatus(NodeStatusFailed)
|
|
cm.hashRing.RemoveNode(nodeID.String())
|
|
cm.mutex.Unlock()
|
|
|
|
// Publish event
|
|
event := &NodeFailed{
|
|
NodeID: nodeID,
|
|
Reason: reason,
|
|
Timestamp: time.Now(),
|
|
}
|
|
return cm.publishEvent(ctx, event)
|
|
}
|
|
```
|
|
|
|
**Benefits:**
|
|
- Events are explicit domain concepts
|
|
- Type-safe (compiler enforces event structure)
|
|
- Published consistently (via publishEvent helper)
|
|
- Other contexts can subscribe and react
|
|
- Full audit trail available
|
|
- Enables event sourcing / CQRS
|
|
|
|
---
|
|
|
|
## Pattern 4: Invariant Validation
|
|
|
|
### Current (Validation Scattered)
|
|
|
|
```go
|
|
// File: cluster/manager.go, line 191-197
|
|
func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) {
|
|
// ...
|
|
now := time.Now()
|
|
for _, node := range cm.nodes {
|
|
if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed {
|
|
node.Status = NodeStatusFailed
|
|
cm.logger.Printf("Node marked as failed: %s", node.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// File: cluster/manager.go, line 276-288
|
|
func (cm *ClusterManager) checkNodeHealth() {
|
|
cm.mutex.Lock()
|
|
defer cm.mutex.Unlock()
|
|
|
|
now := time.Now()
|
|
for _, node := range cm.nodes {
|
|
if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive {
|
|
node.Status = NodeStatusFailed
|
|
cm.logger.Printf("Node failed: %s", node.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Duplicate logic! Easy to miss cases.
|
|
// No central validation.
|
|
```
|
|
|
|
### Intended (Centralized Invariants)
|
|
|
|
```go
|
|
// File: cluster/invariants.go (new file)
|
|
|
|
// ClusterInvariants defines the consistency rules for the cluster
|
|
type ClusterInvariants struct {
|
|
shardCount int
|
|
}
|
|
|
|
// NewClusterInvariants creates an invariant validator
|
|
func NewClusterInvariants(shardCount int) *ClusterInvariants {
|
|
return &ClusterInvariants{shardCount: shardCount}
|
|
}
|
|
|
|
// ValidateNodeHealth checks Invariant 5: Leader is active
|
|
func (i *ClusterInvariants) ValidateNodeHealth(nodes map[string]*NodeInfo, leaderID string) error {
|
|
if leaderID == "" {
|
|
return nil // No leader yet, OK
|
|
}
|
|
|
|
leaderNode, exists := nodes[leaderID]
|
|
if !exists {
|
|
return fmt.Errorf("leader node %s not in cluster", leaderID)
|
|
}
|
|
|
|
if leaderNode.Status() != NodeStatusActive {
|
|
return fmt.Errorf("leader node %s is not active (status: %v)", leaderID, leaderNode.Status())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ValidateShardCoverage checks Invariant 2: All shards assigned
|
|
func (i *ClusterInvariants) ValidateShardCoverage(shardMap *ShardMap) error {
|
|
if shardMap == nil {
|
|
return errors.New("shard map is nil")
|
|
}
|
|
|
|
assignedShards := make(map[int]bool)
|
|
for shardID := range shardMap.Shards {
|
|
assignedShards[shardID] = true
|
|
}
|
|
|
|
for shardID := 0; shardID < i.shardCount; shardID++ {
|
|
if !assignedShards[shardID] {
|
|
return fmt.Errorf("shard %d is not assigned (orphaned)", shardID)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ValidateShardOwnership checks Invariant 3: Only healthy nodes own shards
|
|
func (i *ClusterInvariants) ValidateShardOwnership(shardMap *ShardMap) error {
|
|
if shardMap == nil {
|
|
return errors.New("shard map is nil")
|
|
}
|
|
|
|
for shardID, nodeIDs := range shardMap.Shards {
|
|
for _, nodeID := range nodeIDs {
|
|
nodeInfo, exists := shardMap.Nodes[nodeID.String()]
|
|
if !exists {
|
|
return fmt.Errorf("shard %d assigned to unknown node %s", shardID, nodeID)
|
|
}
|
|
|
|
if nodeInfo.Status() != NodeStatusActive {
|
|
return fmt.Errorf("shard %d assigned to unhealthy node %s (status: %v)",
|
|
shardID, nodeID, nodeInfo.Status())
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ValidateAll runs all invariant checks
|
|
func (i *ClusterInvariants) ValidateAll(topology *ClusterTopology) error {
|
|
if err := i.ValidateNodeHealth(topology.nodes, topology.leaderID); err != nil {
|
|
return fmt.Errorf("invariant violation (I5): %w", err)
|
|
}
|
|
|
|
if err := i.ValidateShardCoverage(topology.shardMap); err != nil {
|
|
return fmt.Errorf("invariant violation (I2): %w", err)
|
|
}
|
|
|
|
if err := i.ValidateShardOwnership(topology.shardMap); err != nil {
|
|
return fmt.Errorf("invariant violation (I3): %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// File: cluster/manager.go (updated)
|
|
|
|
type ClusterManager struct {
|
|
// ... existing fields ...
|
|
invariants *ClusterInvariants // NEW
|
|
}
|
|
|
|
// MarkNodeFailed marks node as failed with invariant checks
|
|
func (cm *ClusterManager) MarkNodeFailed(ctx context.Context, nodeID NodeID, reason string) error {
|
|
cm.mutex.Lock()
|
|
defer cm.mutex.Unlock()
|
|
|
|
// Validate preconditions
|
|
node, exists := cm.nodes[nodeID.String()]
|
|
if !exists {
|
|
return fmt.Errorf("node not found: %s", nodeID)
|
|
}
|
|
if node.Status() == NodeStatusFailed {
|
|
return fmt.Errorf("node already failed: %s", nodeID)
|
|
}
|
|
|
|
// Execute command
|
|
failedNode := node.WithStatus(NodeStatusFailed)
|
|
cm.nodes[nodeID.String()] = failedNode
|
|
cm.hashRing.RemoveNode(nodeID.String())
|
|
|
|
// Validate invariants still hold
|
|
if err := cm.invariants.ValidateNodeHealth(cm.nodes, cm.currentLeaderID); err != nil {
|
|
return fmt.Errorf("invariant violation after node failure: %w", err)
|
|
}
|
|
|
|
// Publish event
|
|
event := &NodeFailed{
|
|
NodeID: nodeID,
|
|
Reason: reason,
|
|
Timestamp: time.Now(),
|
|
}
|
|
_ = cm.publishEvent(ctx, event)
|
|
|
|
return nil
|
|
}
|
|
|
|
// AssignShards assigns shards with invariant validation
|
|
func (cm *ClusterManager) AssignShards(ctx context.Context, newShardMap *ShardMap) error {
|
|
// Only leader can assign
|
|
if !cm.IsLeader() {
|
|
return errors.New("only leader can assign shards")
|
|
}
|
|
|
|
cm.mutex.Lock()
|
|
defer cm.mutex.Unlock()
|
|
|
|
// Validate preconditions
|
|
if err := cm.invariants.ValidateAll(&ClusterTopology{
|
|
nodes: cm.nodes,
|
|
shardMap: newShardMap,
|
|
leaderID: cm.currentLeaderID,
|
|
}); err != nil {
|
|
return fmt.Errorf("cannot assign shards: %w", err)
|
|
}
|
|
|
|
// Execute command
|
|
oldShardMap := cm.shardMap
|
|
cm.shardMap = newShardMap
|
|
|
|
// Publish events
|
|
for shardID, newNodes := range newShardMap.Shards {
|
|
oldNodes := oldShardMap.Shards[shardID]
|
|
if !nodeListEqual(oldNodes, newNodes) {
|
|
event := &ShardMigrated{
|
|
ShardID: shardID,
|
|
FromNodes: oldNodes,
|
|
ToNodes: newNodes,
|
|
Timestamp: time.Now(),
|
|
}
|
|
_ = cm.publishEvent(ctx, event)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
**Benefits:**
|
|
- Invariants defined in one place
|
|
- Easy to audit what's being validated
|
|
- Consistent application across all commands
|
|
- Clear error messages
|
|
- Testable in isolation
|
|
- Easy to add new invariants
|
|
|
|
---
|
|
|
|
## Pattern 5: Rebalancing Strategy
|
|
|
|
### Current (Stubbed)
|
|
|
|
```go
|
|
// File: cluster/shard.go, line 210
|
|
func (chp *ConsistentHashPlacement) RebalanceShards(
|
|
currentMap *ShardMap,
|
|
nodes map[string]*NodeInfo,
|
|
) (*ShardMap, error) {
|
|
// This is a simplified implementation
|
|
// In practice, this would implement sophisticated rebalancing logic
|
|
return currentMap, nil // BUG: Returns unchanged!
|
|
}
|
|
```
|
|
|
|
### Intended (Real Implementation)
|
|
|
|
```go
|
|
// File: cluster/rebalancing.go (new file)
|
|
|
|
// RebalancingStrategy defines how to distribute shards across nodes
|
|
type RebalancingStrategy interface {
|
|
// Rebalance computes new shard assignments
|
|
// Returns new ShardMap or error if unable to rebalance
|
|
Rebalance(
|
|
current *ShardMap,
|
|
activeNodes map[string]*NodeInfo,
|
|
) (*ShardMap, error)
|
|
}
|
|
|
|
// ConsistentHashRebalancer uses consistent hashing to minimize movements
|
|
type ConsistentHashRebalancer struct {
|
|
virtualNodes int
|
|
shardCount int
|
|
}
|
|
|
|
// NewConsistentHashRebalancer creates a rebalancer
|
|
func NewConsistentHashRebalancer(virtualNodes, shardCount int) *ConsistentHashRebalancer {
|
|
return &ConsistentHashRebalancer{
|
|
virtualNodes: virtualNodes,
|
|
shardCount: shardCount,
|
|
}
|
|
}
|
|
|
|
// Rebalance computes new assignments using consistent hashing
|
|
func (chr *ConsistentHashRebalancer) Rebalance(
|
|
current *ShardMap,
|
|
activeNodes map[string]*NodeInfo,
|
|
) (*ShardMap, error) {
|
|
if len(activeNodes) == 0 {
|
|
return nil, errors.New("no active nodes to rebalance to")
|
|
}
|
|
|
|
// Build new hash ring from active nodes
|
|
ring := NewConsistentHashRingWithConfig(HashRingConfig{
|
|
VirtualNodes: chr.virtualNodes,
|
|
})
|
|
for nodeID := range activeNodes {
|
|
ring.AddNode(nodeID)
|
|
}
|
|
|
|
// Reassign each shard via consistent hash
|
|
newAssignments := make(map[int][]string)
|
|
for shardID := 0; shardID < chr.shardCount; shardID++ {
|
|
// Primary node via consistent hash
|
|
primaryNode := ring.GetNode(fmt.Sprintf("shard-%d", shardID))
|
|
if primaryNode == "" {
|
|
return nil, fmt.Errorf("no node assigned for shard %d", shardID)
|
|
}
|
|
|
|
// TODO: Add replicas (for now: single replica)
|
|
newAssignments[shardID] = []string{primaryNode}
|
|
}
|
|
|
|
return &ShardMap{
|
|
Version: current.Version + 1,
|
|
Shards: newAssignments,
|
|
Nodes: activeNodes,
|
|
UpdateTime: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// LoadBalancingRebalancer assigns based on current load (future strategy)
|
|
type LoadBalancingRebalancer struct {
|
|
shardCount int
|
|
}
|
|
|
|
// Rebalance assigns shards to least-loaded nodes
|
|
func (lbr *LoadBalancingRebalancer) Rebalance(
|
|
current *ShardMap,
|
|
activeNodes map[string]*NodeInfo,
|
|
) (*ShardMap, error) {
|
|
// Sort nodes by load
|
|
type nodeLoad struct {
|
|
id string
|
|
load float64
|
|
}
|
|
var nodes []nodeLoad
|
|
for id, node := range activeNodes {
|
|
nodes = append(nodes, nodeLoad{id, node.Load})
|
|
}
|
|
sort.Slice(nodes, func(i, j int) bool {
|
|
return nodes[i].load < nodes[j].load
|
|
})
|
|
|
|
// Assign each shard to least-loaded node
|
|
newAssignments := make(map[int][]string)
|
|
for shardID := 0; shardID < lbr.shardCount; shardID++ {
|
|
// Round-robin through sorted nodes
|
|
idx := shardID % len(nodes)
|
|
newAssignments[shardID] = []string{nodes[idx].id}
|
|
}
|
|
|
|
return &ShardMap{
|
|
Version: current.Version + 1,
|
|
Shards: newAssignments,
|
|
Nodes: activeNodes,
|
|
UpdateTime: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// File: cluster/manager.go (updated)
|
|
|
|
// RebalanceShards coordinates rebalancing
|
|
func (cm *ClusterManager) RebalanceShards(ctx context.Context, reason string) error {
|
|
if !cm.IsLeader() {
|
|
return errors.New("only leader can rebalance")
|
|
}
|
|
|
|
cm.mutex.Lock()
|
|
|
|
// Get active nodes
|
|
activeNodes := make(map[string]*NodeInfo)
|
|
for id, node := range cm.nodes {
|
|
if node.Status() == NodeStatusActive {
|
|
activeNodes[id] = node
|
|
}
|
|
}
|
|
|
|
if len(activeNodes) == 0 {
|
|
cm.mutex.Unlock()
|
|
return errors.New("no active nodes to rebalance to")
|
|
}
|
|
|
|
// Publish rebalancing started
|
|
startEvent := &RebalancingTriggered{
|
|
LeaderID: NodeID{id: cm.currentLeaderID},
|
|
Reason: reason,
|
|
Timestamp: time.Now(),
|
|
}
|
|
_ = cm.publishEvent(ctx, startEvent)
|
|
|
|
// Compute new assignments
|
|
strategy := NewConsistentHashRebalancer(DefaultVirtualNodes, DefaultNumShards)
|
|
newShardMap, err := strategy.Rebalance(cm.shardMap, activeNodes)
|
|
if err != nil {
|
|
cm.mutex.Unlock()
|
|
return fmt.Errorf("rebalancing strategy failed: %w", err)
|
|
}
|
|
|
|
// Validate new assignments
|
|
if err := cm.invariants.ValidateAll(&ClusterTopology{
|
|
nodes: cm.nodes,
|
|
shardMap: newShardMap,
|
|
leaderID: cm.currentLeaderID,
|
|
}); err != nil {
|
|
cm.mutex.Unlock()
|
|
return fmt.Errorf("new shard map violates invariants: %w", err)
|
|
}
|
|
|
|
// Apply new assignments
|
|
oldShardMap := cm.shardMap
|
|
cm.shardMap = newShardMap
|
|
|
|
migratedCount := 0
|
|
for shardID, newNodes := range newShardMap.Shards {
|
|
oldNodes := oldShardMap.Shards[shardID]
|
|
if !nodeListEqual(oldNodes, newNodes) {
|
|
migratedCount++
|
|
// Publish event for each migration
|
|
event := &ShardMigrated{
|
|
ShardID: shardID,
|
|
FromNodes: stringListToNodeIDList(oldNodes),
|
|
ToNodes: stringListToNodeIDList(newNodes),
|
|
Timestamp: time.Now(),
|
|
}
|
|
_ = cm.publishEvent(ctx, event)
|
|
}
|
|
}
|
|
|
|
cm.mutex.Unlock()
|
|
|
|
// Publish rebalancing completed
|
|
completeEvent := &RebalancingCompleted{
|
|
LeaderID: NodeID{id: cm.currentLeaderID},
|
|
MigratedCount: migratedCount,
|
|
CompletedAt: time.Now(),
|
|
}
|
|
_ = cm.publishEvent(ctx, completeEvent)
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
**Benefits:**
|
|
- Strategy pattern allows multiple algorithms
|
|
- Real rebalancing actually redistributes shards
|
|
- New strategies can be plugged in (e.g., load-aware)
|
|
- Invariants checked before applying
|
|
- Events published for observability
|
|
- Testable in isolation
|
|
|
|
---
|
|
|
|
## Pattern 6: Testing Aggregates
|
|
|
|
### Current (Hard to Test)
|
|
|
|
```go
|
|
// Testing is difficult because:
|
|
// 1. No dependency injection (NATS, KV store hardcoded)
|
|
// 2. No way to verify events (none published)
|
|
// 3. No way to inject clock (time.Now() hardcoded)
|
|
// 4. All state is private; hard to assert
|
|
|
|
func TestClusterManager_JoinNode(t *testing.T) {
|
|
// Can't create without real NATS connection!
|
|
natsConn, _ := nats.Connect(nats.DefaultURL)
|
|
defer natsConn.Close()
|
|
|
|
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
|
|
cm, _ := NewClusterManager("node-1", natsConn, ctx)
|
|
|
|
// Can't control time
|
|
// Can't verify events
|
|
// Can't assert invariants
|
|
}
|
|
```
|
|
|
|
### Intended (Testable with Mocks)
|
|
|
|
```go
|
|
// File: cluster/manager_test.go
|
|
|
|
// MockEventPublisher captures published events for testing
|
|
type MockEventPublisher struct {
|
|
events []interface{}
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (m *MockEventPublisher) Publish(ctx context.Context, event interface{}) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.events = append(m.events, event)
|
|
return nil
|
|
}
|
|
|
|
func (m *MockEventPublisher) GetEvents(t *testing.T) []interface{} {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.events
|
|
}
|
|
|
|
func (m *MockEventPublisher) Clear() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.events = []interface{}{}
|
|
}
|
|
|
|
// MockClock allows controlling time in tests
|
|
type MockClock struct {
|
|
now time.Time
|
|
}
|
|
|
|
func (mc *MockClock) Now() time.Time {
|
|
return mc.now
|
|
}
|
|
|
|
func (mc *MockClock) Advance(d time.Duration) {
|
|
mc.now = mc.now.Add(d)
|
|
}
|
|
|
|
// ClusterManagerWithClock allows injecting a clock
|
|
type ClusterManager struct {
|
|
// ... existing fields ...
|
|
clock Clock // NEW
|
|
}
|
|
|
|
type Clock interface {
|
|
Now() time.Time
|
|
}
|
|
|
|
// Test: JoinCluster publishes NodeJoined event
|
|
func TestClusterManager_JoinCluster_PublishesEvent(t *testing.T) {
|
|
// Arrange
|
|
publisher := &MockEventPublisher{}
|
|
cm := &ClusterManager{
|
|
nodes: make(map[string]*NodeInfo),
|
|
hashRing: NewConsistentHashRing(),
|
|
eventPublisher: publisher,
|
|
invariants: NewClusterInvariants(1024),
|
|
}
|
|
|
|
nodeInfo, _ := NewNodeInfo("node-1", "localhost", 8080, 1000)
|
|
|
|
// Act
|
|
ctx := context.Background()
|
|
err := cm.JoinCluster(ctx, nodeInfo)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("JoinCluster failed: %v", err)
|
|
}
|
|
|
|
events := publisher.GetEvents(t)
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected 1 event, got %d", len(events))
|
|
}
|
|
|
|
joinedEvent, ok := events[0].(*NodeJoined)
|
|
if !ok {
|
|
t.Fatalf("expected NodeJoined event, got %T", events[0])
|
|
}
|
|
|
|
if joinedEvent.NodeID.String() != "node-1" {
|
|
t.Errorf("expected node-1, got %s", joinedEvent.NodeID)
|
|
}
|
|
}
|
|
|
|
// Test: MarkNodeFailed with invariant violation
|
|
func TestClusterManager_MarkNodeFailed_ValidatesInvariants(t *testing.T) {
|
|
// Arrange
|
|
publisher := &MockEventPublisher{}
|
|
cm := &ClusterManager{
|
|
nodes: make(map[string]*NodeInfo),
|
|
hashRing: NewConsistentHashRing(),
|
|
eventPublisher: publisher,
|
|
currentLeaderID: "node-1",
|
|
invariants: NewClusterInvariants(1024),
|
|
}
|
|
|
|
// Only one node: the leader
|
|
node1, _ := NewNodeInfo("node-1", "localhost", 8080, 1000)
|
|
cm.nodes["node-1"] = node1
|
|
|
|
// Act: Try to fail the only (leader) node
|
|
ctx := context.Background()
|
|
nodeID, _ := NewNodeID("node-1")
|
|
err := cm.MarkNodeFailed(ctx, nodeID, "test")
|
|
|
|
// Assert: Should fail because it violates Invariant 5 (leader must be active)
|
|
if err == nil {
|
|
t.Fatal("expected error when failing leader, got nil")
|
|
}
|
|
if !strings.Contains(err.Error(), "invariant") {
|
|
t.Errorf("expected invariant error, got: %v", err)
|
|
}
|
|
}
|
|
|
|
// Test: Rebalance uses strategy to compute assignments
|
|
func TestClusterManager_RebalanceShards_UsesStrategy(t *testing.T) {
|
|
// Arrange
|
|
publisher := &MockEventPublisher{}
|
|
cm := &ClusterManager{
|
|
nodes: make(map[string]*NodeInfo),
|
|
hashRing: NewConsistentHashRing(),
|
|
shardMap: &ShardMap{Shards: make(map[int][]string)},
|
|
currentLeaderID: "node-1",
|
|
eventPublisher: publisher,
|
|
invariants: NewClusterInvariants(10), // 10 shards for test
|
|
}
|
|
|
|
// Add nodes
|
|
for i := 1; i <= 2; i++ {
|
|
id := fmt.Sprintf("node-%d", i)
|
|
node, _ := NewNodeInfo(id, "localhost", 8080+i, 1000)
|
|
cm.nodes[id] = node
|
|
cm.hashRing.AddNode(id)
|
|
}
|
|
|
|
// Act: Rebalance
|
|
ctx := context.Background()
|
|
err := cm.RebalanceShards(ctx, "test")
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("RebalanceShards failed: %v", err)
|
|
}
|
|
|
|
// Check that shards are now assigned
|
|
assignedCount := len(cm.shardMap.Shards)
|
|
if assignedCount != 10 {
|
|
t.Errorf("expected 10 shards assigned, got %d", assignedCount)
|
|
}
|
|
|
|
// Check that events were published
|
|
events := publisher.GetEvents(t)
|
|
hasShardMigrated := false
|
|
for _, event := range events {
|
|
if _, ok := event.(*ShardMigrated); ok {
|
|
hasShardMigrated = true
|
|
break
|
|
}
|
|
}
|
|
if !hasShardMigrated {
|
|
t.Error("expected at least one ShardMigrated event")
|
|
}
|
|
}
|
|
```
|
|
|
|
**Benefits:**
|
|
- Dependency injection (publisher, clock, strategy)
|
|
- Easy to verify events
|
|
- Can test invariant validation
|
|
- Can test without NATS
|
|
- Clear, maintainable tests
|
|
- Behavior-focused (what happened, not how)
|
|
|
|
---
|
|
|
|
## Summary: Key Patterns to Adopt
|
|
|
|
| Pattern | Current | Intended | Benefit |
|
|
|---------|---------|----------|---------|
|
|
| Commands | Message handlers | Explicit methods | Clear intent |
|
|
| Events | None published | First-class domain events | Event-driven, auditable |
|
|
| Validation | Scattered | Centralized invariants | Consistent, testable |
|
|
| Immutability | Mutable state | Value objects, copy-on-write | Prevents bugs |
|
|
| Strategy | Stubbed | Real implementation | Actually works |
|
|
| Testing | Hard (coupled) | Dependency injection, mocks | Easy, comprehensive |
|
|
|
|
---
|
|
|
|
## References
|
|
|
|
- [DOMAIN_MODEL.md](./DOMAIN_MODEL.md) - Full domain model
|
|
- [REFACTORING_SUMMARY.md](./REFACTORING_SUMMARY.md) - Implementation roadmap
|
|
- [manager.go](./manager.go) - Current implementation
|
|
- [leader.go](./leader.go) - LeaderElection implementation
|
|
|