Files
aether/cluster/discovery.go
Hugo Nijhuis c757bb76f3
All checks were successful
CI / build (pull_request) Successful in 16s
CI / build (push) Successful in 15s
Make configuration values injectable rather than hardcoded
Add config structs with sensible defaults for tunable parameters:
- JetStreamConfig for stream retention (1 year) and replica count (1)
- HashRingConfig for virtual nodes per physical node (150)
- ShardConfig for shard count (1024) and replication factor (1)

Each component gets a new WithConfig constructor that accepts custom
configuration, while the original constructors continue to work with
defaults. Zero values in configs fall back to defaults for backward
compatibility.

Closes #38

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 15:33:56 +01:00

119 lines
2.5 KiB
Go

package cluster
import (
"context"
"encoding/json"
"time"
"github.com/nats-io/nats.go"
)
// NodeDiscovery manages cluster membership using NATS
type NodeDiscovery struct {
nodeID string
nodeInfo *NodeInfo
natsConn *nats.Conn
heartbeat time.Duration
timeout time.Duration
updates chan NodeUpdate
ctx context.Context
}
// NewNodeDiscovery creates a node discovery service
func NewNodeDiscovery(nodeID string, natsConn *nats.Conn, ctx context.Context) *NodeDiscovery {
nodeInfo := &NodeInfo{
ID: nodeID,
Status: NodeStatusActive,
Capacity: 1000, // Default capacity
Load: 0,
LastSeen: time.Now(),
Metadata: make(map[string]string),
}
return &NodeDiscovery{
nodeID: nodeID,
nodeInfo: nodeInfo,
natsConn: natsConn,
heartbeat: 30 * time.Second,
timeout: 90 * time.Second,
updates: make(chan NodeUpdate, 100),
ctx: ctx,
}
}
// Start begins node discovery and heartbeating
func (nd *NodeDiscovery) Start() {
// Announce this node joining
nd.announceNode(NodeJoined)
// Start heartbeat
ticker := time.NewTicker(nd.heartbeat)
defer ticker.Stop()
// Subscribe to node announcements
nd.natsConn.Subscribe("aether.discovery", func(msg *nats.Msg) {
var update NodeUpdate
if err := json.Unmarshal(msg.Data, &update); err != nil {
return
}
select {
case nd.updates <- update:
case <-nd.ctx.Done():
}
})
for {
select {
case <-ticker.C:
nd.announceNode(NodeUpdated)
case <-nd.ctx.Done():
nd.announceNode(NodeLeft)
return
}
}
}
// GetUpdates returns the channel for receiving node updates
func (nd *NodeDiscovery) GetUpdates() <-chan NodeUpdate {
return nd.updates
}
// GetNodeInfo returns the current node information
func (nd *NodeDiscovery) GetNodeInfo() *NodeInfo {
return nd.nodeInfo
}
// UpdateLoad updates the node's current load
func (nd *NodeDiscovery) UpdateLoad(load float64) {
nd.nodeInfo.Load = load
}
// UpdateVMCount updates the number of VMs on this node
func (nd *NodeDiscovery) UpdateVMCount(count int) {
nd.nodeInfo.VMCount = count
}
// announceNode publishes node status to the cluster
func (nd *NodeDiscovery) announceNode(updateType NodeUpdateType) {
nd.nodeInfo.LastSeen = time.Now()
update := NodeUpdate{
Type: updateType,
Node: nd.nodeInfo,
}
data, err := json.Marshal(update)
if err != nil {
return
}
nd.natsConn.Publish("aether.discovery", data)
}
// Stop gracefully stops the node discovery service
func (nd *NodeDiscovery) Stop() {
nd.announceNode(NodeLeft)
}