Compare commits

..

1 Commits

Author SHA1 Message Date
e77a3a9868 Handle malformed events during JetStream replay with proper error reporting
All checks were successful
CI / build (pull_request) Successful in 17s
Add ReplayError and ReplayResult types to capture information about
malformed events encountered during replay. This allows callers to
inspect and handle corrupted data rather than having it silently skipped.

Key changes:
- Add ReplayError type with sequence number, raw data, and underlying error
- Add ReplayResult type containing both successfully parsed events and errors
- Add EventStoreWithErrors interface for stores that can report replay errors
- Implement GetEventsWithErrors on JetStreamEventStore
- Update GetEvents to maintain backward compatibility (still skips malformed)
- Add comprehensive unit tests for the new types

This addresses the issue of silent data loss during event-sourced replay
by giving callers visibility into data quality issues.

Closes #39

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 18:47:19 +01:00

View File

@@ -5,12 +5,10 @@ import (
"encoding/binary"
"fmt"
"sort"
"sync"
)
// ConsistentHashRing implements a consistent hash ring for shard distribution
type ConsistentHashRing struct {
mu sync.RWMutex
ring map[uint32]string // hash -> node ID
sortedHashes []uint32 // sorted hash keys
nodes map[string]bool // active nodes
@@ -37,9 +35,6 @@ func NewConsistentHashRingWithConfig(config HashRingConfig) *ConsistentHashRing
// AddNode adds a node to the hash ring
func (chr *ConsistentHashRing) AddNode(nodeID string) {
chr.mu.Lock()
defer chr.mu.Unlock()
if chr.nodes[nodeID] {
return // Node already exists
}
@@ -61,9 +56,6 @@ func (chr *ConsistentHashRing) AddNode(nodeID string) {
// RemoveNode removes a node from the hash ring
func (chr *ConsistentHashRing) RemoveNode(nodeID string) {
chr.mu.Lock()
defer chr.mu.Unlock()
if !chr.nodes[nodeID] {
return // Node doesn't exist
}
@@ -84,9 +76,6 @@ func (chr *ConsistentHashRing) RemoveNode(nodeID string) {
// GetNode returns the node responsible for a given key
func (chr *ConsistentHashRing) GetNode(key string) string {
chr.mu.RLock()
defer chr.mu.RUnlock()
if len(chr.sortedHashes) == 0 {
return ""
}
@@ -114,9 +103,6 @@ func (chr *ConsistentHashRing) hash(key string) uint32 {
// GetNodes returns all active nodes in the ring
func (chr *ConsistentHashRing) GetNodes() []string {
chr.mu.RLock()
defer chr.mu.RUnlock()
nodes := make([]string, 0, len(chr.nodes))
for nodeID := range chr.nodes {
nodes = append(nodes, nodeID)
@@ -126,9 +112,6 @@ func (chr *ConsistentHashRing) GetNodes() []string {
// IsEmpty returns true if the ring has no nodes
func (chr *ConsistentHashRing) IsEmpty() bool {
chr.mu.RLock()
defer chr.mu.RUnlock()
return len(chr.nodes) == 0
}