Files
aether/.product-strategy/cluster/REFACTORING_SUMMARY.md
Hugo Nijhuis 271f5db444
Some checks failed
CI / build (push) Successful in 21s
CI / integration (push) Failing after 2m1s
Move product strategy documentation to .product-strategy directory
Organize all product strategy and domain modeling documentation into a
dedicated .product-strategy directory for better separation from code.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-12 23:57:20 +01:00

16 KiB

Cluster Coordination: DDD Refactoring Summary

Overview

The Cluster Coordination bounded context manages distributed topology (nodes, shards, leadership) for Aether's actor system. This document highlights gaps between the intended DDD model and current implementation, with prioritized refactoring recommendations.


Current State: Code vs Domain Model

What's Working Well

  1. LeaderElection aggregate (✓)

    • Correctly uses NATS KV atomic operations to enforce "single leader per term"
    • Lease renewal every 3s + expiration after 10s prevents split-brain
    • Lease-based approach simpler than Raft; good for this context
  2. ConsistentHashRing utility (✓)

    • Properly implements consistent hashing with virtual nodes (150 per node)
    • Minimizes shard reshuffling on topology changes
    • Thread-safe via RWMutex
  3. NodeDiscovery (✓)

    • Heartbeat mechanism (every 30s) for membership discovery
    • Failure detection via absence (90s timeout in ClusterManager)
    • Graceful shutdown signal (NodeLeft)
  4. Architecture (interfaces) (✓)

    • VMRegistry interface decouples cluster package from runtime
    • Runtime interface avoids import cycles
    • PlacementStrategy pattern allows pluggable rebalancing algorithms

What Needs Work

Gap 1: Anemic Domain Model

Problem: ClusterManager, ShardManager lack explicit commands and domain events; mostly data holders.

Evidence:

  • ClusterManager: stores state (nodes, shardMap, hashRing) but no command handlers
  • Node updates handled via generic message dispatcher (handleClusterMessage), not domain commands
  • No event publishing; state changes are silent

Example:

// Current (anemic):
cm.nodes[update.Node.ID] = update.Node
cm.hashRing.AddNode(update.Node.ID)

// Intended (DDD):
event := cm.JoinCluster(nodeInfo)  // Command
eventBus.Publish(event)            // Event: NodeJoined

Refactoring: Extract command methods with explicit intent language

  • Add JoinCluster(nodeInfo) command handler
  • Add MarkNodeFailed(nodeID, reason) command handler
  • Add AssignShards(shardMap) command handler
  • Publish NodeJoined, NodeFailed, ShardAssigned events

Gap 2: No Event Sourcing

Problem: Topology changes don't produce events; impossible to audit "who owned shard 42 at 3pm?"

Evidence:

  • No event store integration (events captured in code comments, not persisted)
  • LeaderElection uses callbacks instead of publishing events
  • No audit trail of topology decisions

Impact: Can't rebuild topology state, can't debug rebalancing decisions, can't integrate with other contexts via events.

Refactoring: Introduce event publishing

  • Add EventPublisher interface to aggregates
  • Publish LeaderElected, LeadershipLost, LeadershipRenewed events
  • Publish NodeJoined, NodeLeft, NodeFailed events
  • Publish ShardAssigned, ShardMigrated events
  • Store events in event store (optional: in-memory for now)

Gap 3: Responsibility Split (Cluster vs ShardAssignment)

Problem: Cluster topology (ClusterManager) and shard assignment (ShardManager) are separate aggregates without clear ownership of invariants.

Evidence:

  • ClusterManager decides "node failed, trigger rebalance"
  • ShardManager does "compute new assignments"
  • No one validates "new assignment only uses healthy nodes"

Risk: Concurrent rebalancing from multiple nodes; stale assignments to failed nodes; orphaned shards.

Refactoring: Unify under Cluster aggregate root (or establish clear interface)

  • ClusterManager owns Cluster aggregate (nodes, shards, leadership)
  • ShardManager becomes ShardAssignment aggregate (or ShardingPolicy utility)
  • Only Cluster can issue ShardAssigned commands
  • ShardManager validates invariants (all nodes healthy, all shards assigned)

Gap 4: Rebalancing Logic Incomplete

Problem: PlacementStrategy.RebalanceShards is stubbed; actual rebalancing doesn't happen.

Evidence: ConsistentHashPlacement.RebalanceShards returns currentMap unchanged (line 214, shard.go)

Impact: Adding a node or removing a failed node doesn't actually redistribute shards to new nodes.

Refactoring: Implement real rebalancing

  • Use ConsistentHashRing to compute new assignments
  • Minimize shard movement (virtual nodes help, but still need to compute delta)
  • Verify no shard orphaning after new topology
  • Test: adding node should redistribute ~1/N shards to it

Gap 5: Invariant Validation Scattered

Problem: Invariants checked in multiple places; easy to miss a case.

Evidence:

  • Node failure detection in handleNodeUpdate (line 191)
  • Duplicate check in checkNodeHealth (line 283)
  • No central validation that "all shards in [0, ShardCount) are assigned"

Refactoring: Centralize invariant validation

  • Add Cluster.ValidateTopology() method
  • Add ShardAssignment.ValidateAssignments() method
  • Call validation after every topology change
  • Test: add node, verify all shards assigned and no orphans

Gap 6: LeaderElection Uses Callbacks, Not Events

Problem: Leadership changes trigger callbacks (OnBecameLeader, OnNewLeader); no events for other contexts.

Evidence:

// Current (callbacks in manager.go line 54-63)
callbacks := LeaderElectionCallbacks{
    OnBecameLeader: func() { cm.logger.Printf("...") },
    ...
}

// Intended (events published to event bus)
eventBus.Publish(LeaderElected{LeaderID, Term, ExpiresAt})

Refactoring: Publish events instead of (or in addition to) callbacks

  • Publish LeaderElected event
  • Publish LeadershipLost event
  • Events captured in event store, enabling other contexts to react

Refactoring Priority Matrix

High Priority (Blocks Event-Driven Integration)

ID Issue Effort Impact Reason
1 Extract Cluster aggregate with explicit commands Med High Unblocks event publishing; enables other contexts to react
2 Implement PlacementStrategy.RebalanceShards Med High Rebalancing currently doesn't work; critical for node scaling
3 Publish domain events (NodeJoined, ShardAssigned, etc.) Med High Enables event sourcing, audit trail, inter-context communication

Medium Priority (Improves Clarity & Robustness)

ID Issue Effort Impact Reason
4 Extract MarkNodeFailed command handler Low Med Consolidates node failure logic; improves intent clarity
5 Unify ShardAssignment invariant validation Low Med Prevents orphaned shards; catches bugs early
6 Add shard migration tracking High Med Prevents rebalancing while migrations in flight
7 Publish LeaderElection events Low Med Improves observability; auditable leadership changes

Low Priority (Nice to Have)

ID Issue Effort Impact Reason
8 Add GetNodeHealth read model Low Low Monitoring/debugging; not core to coordination
9 Add rebalancing status tracking Low Low Observability; doesn't affect correctness

Refactoring Plan (First Sprint)

Phase 1: Extract Cluster Commands (Week 1)

Goal: Make cluster topology changes explicit and intent-driven.

// Add to ClusterManager

// JoinCluster adds a node to the cluster
func (cm *ClusterManager) JoinCluster(nodeInfo *NodeInfo) error {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()

    // Validate
    if nodeInfo.ID == "" {
        return errors.New("node ID empty")
    }
    if nodeInfo.Capacity <= 0 {
        return errors.New("node capacity must be > 0")
    }

    // Command execution
    cm.nodes[nodeInfo.ID] = nodeInfo
    cm.hashRing.AddNode(nodeInfo.ID)

    // Event: publish NodeJoined
    cm.publishEvent(&NodeJoined{
        NodeID:    nodeInfo.ID,
        Address:   nodeInfo.Address,
        Capacity:  nodeInfo.Capacity,
        Timestamp: time.Now(),
    })

    // Trigger rebalancing if leader
    if cm.IsLeader() {
        go cm.triggerShardRebalancing("node joined")
    }

    return nil
}

// MarkNodeFailed marks a node as failed
func (cm *ClusterManager) MarkNodeFailed(nodeID string, reason string) error {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()

    node, exists := cm.nodes[nodeID]
    if !exists {
        return fmt.Errorf("node not found: %s", nodeID)
    }
    if node.Status == NodeStatusFailed {
        return fmt.Errorf("node already failed: %s", nodeID)
    }

    // Command execution
    node.Status = NodeStatusFailed
    cm.hashRing.RemoveNode(nodeID)

    // Event: publish NodeFailed
    cm.publishEvent(&NodeFailed{
        NodeID:    nodeID,
        Reason:    reason,
        Timestamp: time.Now(),
    })

    // Trigger rebalancing if leader
    if cm.IsLeader() {
        go cm.triggerShardRebalancing("node failed")
    }

    return nil
}

Deliverables:

  • ClusterManager.JoinCluster(nodeInfo) command
  • ClusterManager.MarkNodeFailed(nodeID, reason) command
  • ClusterManager.publishEvent() helper
  • Events: NodeJoined, NodeFailed (defined but not yet stored)
  • Tests: verify commands validate invariants, trigger events

Blocking Dependency: EventPublisher interface (phase 2)


Phase 2: Publish Domain Events (Week 2)

Goal: Make topology changes observable and auditable.

// Add EventPublisher interface
type EventPublisher interface {
    Publish(event interface{}) error
}

// ClusterManager uses it
type ClusterManager struct {
    // ...
    publisher EventPublisher
}

// Define domain events
type NodeJoined struct {
    NodeID    string
    Address   string
    Capacity  float64
    Timestamp time.Time
}

type NodeFailed struct {
    NodeID    string
    Reason    string
    Timestamp time.Time
}

type ShardAssigned struct {
    ShardID   int
    NodeIDs   []string
    Version   uint64
    Timestamp time.Time
}

type ShardMigrated struct {
    ShardID    int
    FromNodes  []string
    ToNodes    []string
    Timestamp  time.Time
}

Deliverables:

  • EventPublisher interface
  • Domain events: NodeJoined, NodeFailed, ShardAssigned, ShardMigrated, RebalancingTriggered, RebalancingCompleted
  • LeaderElection publishes LeaderElected, LeadershipLost
  • Events published to NATS (via NATSEventBus) for cross-context communication
  • Tests: verify events published correctly

Phase 3: Implement Real Rebalancing (Week 3)

Goal: Make rebalancing actually redistribute shards to new nodes.

// In ShardManager (or separate RebalancingStrategy)

func (cp *ConsistentHashPlacement) RebalanceShards(
    currentMap *ShardMap,
    activeNodes map[string]*NodeInfo,
) (*ShardMap, error) {
    if len(activeNodes) == 0 {
        return nil, errors.New("no active nodes")
    }

    // Build new hash ring from current nodes
    ring := NewConsistentHashRingWithConfig(DefaultHashRingConfig())
    for nodeID := range activeNodes {
        ring.AddNode(nodeID)
    }

    // Reassign each shard via consistent hash
    newAssignments := make(map[int][]string)
    for shardID := 0; shardID < len(currentMap.Shards); shardID++ {
        primaryNode := ring.GetNode(fmt.Sprintf("shard-%d", shardID))
        newAssignments[shardID] = []string{primaryNode}

        // TODO: add replicas based on replication factor
    }

    return &ShardMap{
        Version:    currentMap.Version + 1,
        Shards:     newAssignments,
        Nodes:      activeNodes,
        UpdateTime: time.Now(),
    }, nil
}

Deliverables:

  • ConsistentHashPlacement.RebalanceShards implemented (not stubbed)
  • Handles node addition (redistribute to new node)
  • Handles node removal (redistribute from failed node)
  • Tests: adding node redistributes ~1/N shards; removing node doesn't orphan shards

Phase 4: Unify ShardAssignment Invariants (Week 4)

Goal: Validate shard assignments are safe before applying.

// In ClusterManager

func (cm *ClusterManager) AssignShards(newShardMap *ShardMap) error {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()

    // Validate: all shards assigned
    allShards := make(map[int]bool)
    for shardID := range newShardMap.Shards {
        allShards[shardID] = true
    }
    for i := 0; i < 1024; i++ {
        if !allShards[i] {
            return fmt.Errorf("shard %d not assigned", i)
        }
    }

    // Validate: all nodes are healthy
    for _, nodeList := range newShardMap.Shards {
        for _, nodeID := range nodeList {
            node := cm.nodes[nodeID]
            if node.Status != NodeStatusActive {
                return fmt.Errorf("shard assigned to unhealthy node: %s", nodeID)
            }
        }
    }

    // Apply new assignments
    oldVersion := cm.shardMap.Version
    cm.shardMap = newShardMap

    // Publish events for each shard change
    for shardID, nodeList := range newShardMap.Shards {
        oldNodes := cm.shardMap.Shards[shardID]
        if !stringSliceEqual(oldNodes, nodeList) {
            cm.publishEvent(&ShardMigrated{
                ShardID:    shardID,
                FromNodes:  oldNodes,
                ToNodes:    nodeList,
                Timestamp:  time.Now(),
            })
        }
    }

    return nil
}

Deliverables:

  • ShardAssignment invariant validation (all shards assigned, only healthy nodes)
  • AssignShards command handler in ClusterManager
  • Publish ShardMigrated events
  • Tests: reject assignment with orphaned shards; reject assignment to failed node

Testing Checklist

Unit Tests (Phase 1-2)

  • JoinCluster command validates node ID is unique
  • MarkNodeFailed command validates node exists
  • Commands trigger events
  • Commands fail on invalid input (empty ID, negative capacity)
  • Commands fail if not leader (AssignShards, RebalanceShards)

Integration Tests (Phase 3-4)

  • Single leader election (3 nodes)
  • Leader failure → new leader elected within 10s
  • Node join → shards redistributed to new node
  • Node failure → shards reassigned from failed node
  • Graceful shutdown → no 90s timeout
  • No orphaned shards after rebalancing

Chaos Tests (Phase 4)

  • Leader fails mid-rebalance → new leader resumes
  • Network partition → split-brain prevented by lease
  • Cascading failures → cluster stabilizes
  • High churn (nodes join/leave rapidly) → topology converges

Success Metrics

After Phase 1 (Explicit Commands)

  • ✓ ClusterManager has JoinCluster, MarkNodeFailed command methods
  • ✓ Commands validate preconditions
  • ✓ Commands trigger rebalancing if leader

After Phase 2 (Domain Events)

  • ✓ NodeJoined, NodeFailed, ShardAssigned events published
  • ✓ LeaderElection publishes LeaderElected, LeadershipLost events
  • ✓ Events visible in NATS pub/sub for other contexts

After Phase 3 (Real Rebalancing)

  • ✓ PlacementStrategy actually redistributes shards
  • ✓ Adding node → shards assigned to it
  • ✓ Removing node → shards reassigned elsewhere
  • ✓ No orphaned shards

After Phase 4 (Unified Invariants)

  • ✓ Invalid assignments rejected (unhealthy node, orphaned shard)
  • ✓ All shard changes trigger events
  • ✓ Cluster invariants validated before applying topology

Integration with Other Contexts

Once Cluster Coordination publishes domain events, other contexts can consume them:

Actor Runtime Context

  • Subscribes to: ShardMigrated
  • Actions: Migrate actors from old node to new node

Monitoring Context

  • Subscribes to: NodeJoined, NodeFailed, LeaderElected
  • Actions: Update cluster health dashboard

Audit Context

  • Subscribes to: NodeJoined, NodeFailed, ShardAssigned, LeaderElected
  • Actions: Record topology change log

References