Files
aether/store/jetstream.go
Hugo Nijhuis b630258f60
All checks were successful
CI / build (pull_request) Successful in 17s
Handle malformed events during JetStream replay with proper error reporting
Add ReplayError and ReplayResult types to capture information about
malformed events encountered during replay. This allows callers to
inspect and handle corrupted data rather than having it silently skipped.

Key changes:
- Add ReplayError type with sequence number, raw data, and underlying error
- Add ReplayResult type containing both successfully parsed events and errors
- Add EventStoreWithErrors interface for stores that can report replay errors
- Implement GetEventsWithErrors on JetStreamEventStore
- Update GetEvents to maintain backward compatibility (still skips malformed)
- Add comprehensive unit tests for the new types

This addresses the issue of silent data loss during event-sourced replay
by giving callers visibility into data quality issues.

Closes #39

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 15:32:46 +01:00

315 lines
8.8 KiB
Go

package store
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
)
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
// It also implements EventStoreWithErrors to report malformed events during replay.
type JetStreamEventStore struct {
js nats.JetStreamContext
streamName string
mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache
}
// NewJetStreamEventStore creates a new JetStream-based event store
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
js, err := natsConn.JetStream()
if err != nil {
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
}
// Create or update the stream
stream := &nats.StreamConfig{
Name: streamName,
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
Storage: nats.FileStorage,
Retention: nats.LimitsPolicy,
MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year
Replicas: 1, // Can be increased for HA
}
_, err = js.AddStream(stream)
if err != nil && !strings.Contains(err.Error(), "already exists") {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
return &JetStreamEventStore{
js: js,
streamName: streamName,
versions: make(map[string]int64),
}, nil
}
// SaveEvent persists an event to JetStream.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
jes.mu.Lock()
defer jes.mu.Unlock()
// Get current latest version for this actor
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Validate version is strictly greater than current
if event.Version <= currentVersion {
return &aether.VersionConflictError{
ActorID: event.ActorID,
AttemptedVersion: event.Version,
CurrentVersion: currentVersion,
}
}
// Serialize event to JSON
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
// Create subject: stream.events.actorType.actorID
subject := fmt.Sprintf("%s.events.%s.%s",
jes.streamName,
sanitizeSubject(extractActorType(event.ActorID)),
sanitizeSubject(event.ActorID))
// Publish with event ID as message ID for deduplication
_, err = jes.js.Publish(subject, data, nats.MsgId(event.ID))
if err != nil {
return fmt.Errorf("failed to publish event to JetStream: %w", err)
}
// Update version cache
jes.versions[event.ActorID] = event.Version
return nil
}
// getLatestVersionLocked returns the latest version for an actor.
// Caller must hold jes.mu.
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
// Check cache first
if version, ok := jes.versions[actorID]; ok {
return version, nil
}
// Fetch from JetStream - use internal method that returns result
result, err := jes.getEventsWithErrorsInternal(actorID, 0)
if err != nil {
return 0, err
}
if len(result.Events) == 0 {
return 0, nil
}
latestVersion := int64(0)
for _, event := range result.Events {
if event.Version > latestVersion {
latestVersion = event.Version
}
}
// Update cache
jes.versions[actorID] = latestVersion
return latestVersion, nil
}
// GetEvents retrieves all events for an actor since a version.
// Note: This method silently skips malformed events for backward compatibility.
// Use GetEventsWithErrors to receive information about malformed events.
func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
result, err := jes.getEventsWithErrorsInternal(actorID, fromVersion)
if err != nil {
return nil, err
}
return result.Events, nil
}
// GetEventsWithErrors retrieves events for an actor and reports any malformed
// events encountered. This method allows callers to decide how to handle
// corrupted data rather than silently skipping it.
func (jes *JetStreamEventStore) GetEventsWithErrors(actorID string, fromVersion int64) (*aether.ReplayResult, error) {
return jes.getEventsWithErrorsInternal(actorID, fromVersion)
}
// getEventsWithErrorsInternal is the internal implementation that tracks both
// successfully parsed events and errors for malformed events.
func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, fromVersion int64) (*aether.ReplayResult, error) {
// Create subject filter for this actor
subject := fmt.Sprintf("%s.events.%s.%s",
jes.streamName,
sanitizeSubject(extractActorType(actorID)),
sanitizeSubject(actorID))
// Create consumer to read events
consumer, err := jes.js.PullSubscribe(subject, "")
if err != nil {
return nil, fmt.Errorf("failed to create consumer: %w", err)
}
defer consumer.Unsubscribe()
result := &aether.ReplayResult{
Events: make([]*aether.Event, 0),
Errors: make([]aether.ReplayError, 0),
}
// Fetch messages in batches
for {
msgs, err := consumer.Fetch(100, nats.MaxWait(time.Second))
if err != nil {
if err == nats.ErrTimeout {
break // No more messages
}
return nil, fmt.Errorf("failed to fetch messages: %w", err)
}
for _, msg := range msgs {
var event aether.Event
if err := json.Unmarshal(msg.Data, &event); err != nil {
// Record the error with context instead of silently skipping
metadata, _ := msg.Metadata()
seqNum := uint64(0)
if metadata != nil {
seqNum = metadata.Sequence.Stream
}
result.Errors = append(result.Errors, aether.ReplayError{
SequenceNumber: seqNum,
RawData: msg.Data,
Err: err,
})
msg.Ack() // Still ack to prevent redelivery
continue
}
// Filter by version
if event.Version > fromVersion {
result.Events = append(result.Events, &event)
}
msg.Ack()
}
if len(msgs) < 100 {
break // No more messages
}
}
return result, nil
}
// GetLatestVersion returns the latest version for an actor
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
events, err := jes.GetEvents(actorID, 0)
if err != nil {
return 0, err
}
if len(events) == 0 {
return 0, nil
}
latestVersion := int64(0)
for _, event := range events {
if event.Version > latestVersion {
latestVersion = event.Version
}
}
return latestVersion, nil
}
// GetLatestSnapshot gets the most recent snapshot for an actor
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
// Create subject for snapshots
subject := fmt.Sprintf("%s.snapshots.%s.%s",
jes.streamName,
sanitizeSubject(extractActorType(actorID)),
sanitizeSubject(actorID))
// Try to get the latest snapshot
consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast())
if err != nil {
return nil, fmt.Errorf("failed to create snapshot consumer: %w", err)
}
defer consumer.Unsubscribe()
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
if err != nil {
if err == nats.ErrTimeout {
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
}
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
}
if len(msgs) == 0 {
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
}
var snapshot aether.ActorSnapshot
if err := json.Unmarshal(msgs[0].Data, &snapshot); err != nil {
return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err)
}
msgs[0].Ack()
return &snapshot, nil
}
// SaveSnapshot saves a snapshot of actor state
func (jes *JetStreamEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
// Serialize snapshot to JSON
data, err := json.Marshal(snapshot)
if err != nil {
return fmt.Errorf("failed to marshal snapshot: %w", err)
}
// Create subject for snapshots
subject := fmt.Sprintf("%s.snapshots.%s.%s",
jes.streamName,
sanitizeSubject(extractActorType(snapshot.ActorID)),
sanitizeSubject(snapshot.ActorID))
// Publish snapshot
_, err = jes.js.Publish(subject, data)
if err != nil {
return fmt.Errorf("failed to publish snapshot to JetStream: %w", err)
}
return nil
}
// Helper functions
// extractActorType extracts the actor type from an actor ID
func extractActorType(actorID string) string {
for i, c := range actorID {
if c == '-' && i > 0 {
return actorID[:i]
}
}
return "unknown"
}
// sanitizeSubject sanitizes a string for use in NATS subjects
func sanitizeSubject(s string) string {
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, ".", "_")
s = strings.ReplaceAll(s, "*", "_")
s = strings.ReplaceAll(s, ">", "_")
return s
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)