All checks were successful
CI / build (pull_request) Successful in 16s
- Add sync.RWMutex to ConsistentHashRing struct - Use Lock/Unlock for write operations (AddNode, RemoveNode) - Use RLock/RUnlock for read operations (GetNode, GetNodes, IsEmpty) This allows concurrent reads (the common case) while serializing writes, preventing race conditions when multiple goroutines access the hash ring. Closes #35 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
139 lines
3.4 KiB
Go
139 lines
3.4 KiB
Go
package cluster
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
)
|
|
|
|
// ConsistentHashRing implements a consistent hash ring for shard distribution
|
|
type ConsistentHashRing struct {
|
|
mu sync.RWMutex
|
|
ring map[uint32]string // hash -> node ID
|
|
sortedHashes []uint32 // sorted hash keys
|
|
nodes map[string]bool // active nodes
|
|
virtualNodes int // number of virtual nodes per physical node
|
|
}
|
|
|
|
// NewConsistentHashRing creates a new consistent hash ring with default configuration
|
|
func NewConsistentHashRing() *ConsistentHashRing {
|
|
return NewConsistentHashRingWithConfig(DefaultHashRingConfig())
|
|
}
|
|
|
|
// NewConsistentHashRingWithConfig creates a new consistent hash ring with custom configuration
|
|
func NewConsistentHashRingWithConfig(config HashRingConfig) *ConsistentHashRing {
|
|
virtualNodes := config.VirtualNodes
|
|
if virtualNodes == 0 {
|
|
virtualNodes = DefaultVirtualNodes
|
|
}
|
|
return &ConsistentHashRing{
|
|
ring: make(map[uint32]string),
|
|
nodes: make(map[string]bool),
|
|
virtualNodes: virtualNodes,
|
|
}
|
|
}
|
|
|
|
// AddNode adds a node to the hash ring
|
|
func (chr *ConsistentHashRing) AddNode(nodeID string) {
|
|
chr.mu.Lock()
|
|
defer chr.mu.Unlock()
|
|
|
|
if chr.nodes[nodeID] {
|
|
return // Node already exists
|
|
}
|
|
|
|
chr.nodes[nodeID] = true
|
|
|
|
// Add virtual nodes for better distribution
|
|
for i := 0; i < chr.virtualNodes; i++ {
|
|
virtualKey := fmt.Sprintf("%s:%d", nodeID, i)
|
|
hash := chr.hash(virtualKey)
|
|
chr.ring[hash] = nodeID
|
|
chr.sortedHashes = append(chr.sortedHashes, hash)
|
|
}
|
|
|
|
sort.Slice(chr.sortedHashes, func(i, j int) bool {
|
|
return chr.sortedHashes[i] < chr.sortedHashes[j]
|
|
})
|
|
}
|
|
|
|
// RemoveNode removes a node from the hash ring
|
|
func (chr *ConsistentHashRing) RemoveNode(nodeID string) {
|
|
chr.mu.Lock()
|
|
defer chr.mu.Unlock()
|
|
|
|
if !chr.nodes[nodeID] {
|
|
return // Node doesn't exist
|
|
}
|
|
|
|
delete(chr.nodes, nodeID)
|
|
|
|
// Remove all virtual nodes for this physical node
|
|
newHashes := make([]uint32, 0)
|
|
for _, hash := range chr.sortedHashes {
|
|
if chr.ring[hash] != nodeID {
|
|
newHashes = append(newHashes, hash)
|
|
} else {
|
|
delete(chr.ring, hash)
|
|
}
|
|
}
|
|
chr.sortedHashes = newHashes
|
|
}
|
|
|
|
// GetNode returns the node responsible for a given key
|
|
func (chr *ConsistentHashRing) GetNode(key string) string {
|
|
chr.mu.RLock()
|
|
defer chr.mu.RUnlock()
|
|
|
|
if len(chr.sortedHashes) == 0 {
|
|
return ""
|
|
}
|
|
|
|
hash := chr.hash(key)
|
|
|
|
// Find the first node with hash >= key hash (clockwise)
|
|
idx := sort.Search(len(chr.sortedHashes), func(i int) bool {
|
|
return chr.sortedHashes[i] >= hash
|
|
})
|
|
|
|
// Wrap around to the first node if we've gone past the end
|
|
if idx == len(chr.sortedHashes) {
|
|
idx = 0
|
|
}
|
|
|
|
return chr.ring[chr.sortedHashes[idx]]
|
|
}
|
|
|
|
// hash computes a hash for the given key
|
|
func (chr *ConsistentHashRing) hash(key string) uint32 {
|
|
h := sha256.Sum256([]byte(key))
|
|
return binary.BigEndian.Uint32(h[:4])
|
|
}
|
|
|
|
// GetNodes returns all active nodes in the ring
|
|
func (chr *ConsistentHashRing) GetNodes() []string {
|
|
chr.mu.RLock()
|
|
defer chr.mu.RUnlock()
|
|
|
|
nodes := make([]string, 0, len(chr.nodes))
|
|
for nodeID := range chr.nodes {
|
|
nodes = append(nodes, nodeID)
|
|
}
|
|
return nodes
|
|
}
|
|
|
|
// IsEmpty returns true if the ring has no nodes
|
|
func (chr *ConsistentHashRing) IsEmpty() bool {
|
|
chr.mu.RLock()
|
|
defer chr.mu.RUnlock()
|
|
|
|
return len(chr.nodes) == 0
|
|
}
|
|
|
|
// GetVirtualNodes returns the number of virtual nodes per physical node
|
|
func (chr *ConsistentHashRing) GetVirtualNodes() int {
|
|
return chr.virtualNodes
|
|
}
|