diff --git a/event.go b/event.go index 8d76ac5..94ea58c 100644 --- a/event.go +++ b/event.go @@ -28,6 +28,39 @@ func (e *VersionConflictError) Unwrap() error { return ErrVersionConflict } +// ReplayError captures information about a malformed event encountered during replay. +// This allows callers to inspect and handle corrupted data without losing context. +type ReplayError struct { + // SequenceNumber is the sequence number of the message in the stream (if available) + SequenceNumber uint64 + // RawData is the raw bytes that could not be unmarshaled + RawData []byte + // Err is the underlying unmarshal error + Err error +} + +func (e *ReplayError) Error() string { + return fmt.Sprintf("failed to unmarshal event at sequence %d: %v", e.SequenceNumber, e.Err) +} + +func (e *ReplayError) Unwrap() error { + return e.Err +} + +// ReplayResult contains the results of replaying events, including any errors encountered. +// This allows callers to decide how to handle malformed events rather than silently skipping them. +type ReplayResult struct { + // Events contains the successfully unmarshaled events + Events []*Event + // Errors contains information about any malformed events encountered + Errors []ReplayError +} + +// HasErrors returns true if any malformed events were encountered during replay +func (r *ReplayResult) HasErrors() bool { + return len(r.Errors) > 0 +} + // Event represents a domain event in the system type Event struct { ID string `json:"id"` @@ -174,6 +207,18 @@ type EventStore interface { GetLatestVersion(actorID string) (int64, error) } +// EventStoreWithErrors extends EventStore with methods that report malformed events. +// Stores that may encounter corrupted data during replay (e.g., JetStream) should +// implement this interface to give callers visibility into data quality issues. +type EventStoreWithErrors interface { + EventStore + + // GetEventsWithErrors retrieves events for an actor and reports any malformed + // events encountered. This method allows callers to decide how to handle + // corrupted data rather than silently skipping it. + GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error) +} + // SnapshotStore extends EventStore with snapshot capabilities type SnapshotStore interface { EventStore diff --git a/event_test.go b/event_test.go index 777c7a9..6590daf 100644 --- a/event_test.go +++ b/event_test.go @@ -1208,3 +1208,130 @@ func TestEvent_MetadataAllHelpersRoundTrip(t *testing.T) { t.Errorf("GetSpanID mismatch: got %q", decoded.GetSpanID()) } } + +// Tests for ReplayError and ReplayResult types + +func TestReplayError_Error(t *testing.T) { + err := &ReplayError{ + SequenceNumber: 42, + RawData: []byte(`invalid json`), + Err: json.Unmarshal([]byte(`{`), &struct{}{}), + } + + errMsg := err.Error() + if !strings.Contains(errMsg, "42") { + t.Errorf("expected error message to contain sequence number, got: %s", errMsg) + } + if !strings.Contains(errMsg, "unmarshal") || !strings.Contains(errMsg, "failed") { + t.Errorf("expected error message to contain 'failed' and 'unmarshal', got: %s", errMsg) + } +} + +func TestReplayError_Unwrap(t *testing.T) { + innerErr := json.Unmarshal([]byte(`{`), &struct{}{}) + err := &ReplayError{ + SequenceNumber: 1, + RawData: []byte(`{`), + Err: innerErr, + } + + unwrapped := err.Unwrap() + if unwrapped != innerErr { + t.Errorf("expected Unwrap to return inner error") + } +} + +func TestReplayResult_HasErrors(t *testing.T) { + tests := []struct { + name string + result *ReplayResult + expected bool + }{ + { + name: "no errors", + result: &ReplayResult{Events: []*Event{}, Errors: []ReplayError{}}, + expected: false, + }, + { + name: "nil errors slice", + result: &ReplayResult{Events: []*Event{}, Errors: nil}, + expected: false, + }, + { + name: "has errors", + result: &ReplayResult{ + Events: []*Event{}, + Errors: []ReplayError{ + {SequenceNumber: 1, RawData: []byte(`bad`), Err: nil}, + }, + }, + expected: true, + }, + { + name: "has events and errors", + result: &ReplayResult{ + Events: []*Event{{ID: "evt-1"}}, + Errors: []ReplayError{ + {SequenceNumber: 2, RawData: []byte(`bad`), Err: nil}, + }, + }, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.result.HasErrors(); got != tt.expected { + t.Errorf("HasErrors() = %v, want %v", got, tt.expected) + } + }) + } +} + +func TestReplayResult_EmptyResult(t *testing.T) { + result := &ReplayResult{ + Events: []*Event{}, + Errors: []ReplayError{}, + } + + if result.HasErrors() { + t.Error("expected HasErrors() to return false for empty result") + } + if len(result.Events) != 0 { + t.Errorf("expected 0 events, got %d", len(result.Events)) + } +} + +func TestReplayError_WithZeroSequence(t *testing.T) { + err := &ReplayError{ + SequenceNumber: 0, + RawData: []byte(`corrupted`), + Err: json.Unmarshal([]byte(`not-json`), &struct{}{}), + } + + errMsg := err.Error() + if !strings.Contains(errMsg, "sequence 0") { + t.Errorf("expected error message to contain 'sequence 0', got: %s", errMsg) + } +} + +func TestReplayError_WithLargeRawData(t *testing.T) { + largeData := make([]byte, 1024*1024) // 1MB + for i := range largeData { + largeData[i] = 'x' + } + + err := &ReplayError{ + SequenceNumber: 999, + RawData: largeData, + Err: json.Unmarshal(largeData, &struct{}{}), + } + + // Should be able to create the error without issues + if len(err.RawData) != 1024*1024 { + t.Errorf("expected RawData to be preserved, got length %d", len(err.RawData)) + } + + // Error() should still work + _ = err.Error() +} diff --git a/store/jetstream.go b/store/jetstream.go index f30967e..9e66eed 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -33,7 +33,8 @@ func DefaultJetStreamConfig() JetStreamConfig { } } -// JetStreamEventStore implements EventStore using NATS JetStream for persistence +// JetStreamEventStore implements EventStore using NATS JetStream for persistence. +// It also implements EventStoreWithErrors to report malformed events during replay. type JetStreamEventStore struct { js nats.JetStreamContext streamName string @@ -139,18 +140,18 @@ func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, e return version, nil } - // Fetch from JetStream - events, err := jes.getEventsInternal(actorID, 0) + // Fetch from JetStream - use internal method that returns result + result, err := jes.getEventsWithErrorsInternal(actorID, 0) if err != nil { return 0, err } - if len(events) == 0 { + if len(result.Events) == 0 { return 0, nil } latestVersion := int64(0) - for _, event := range events { + for _, event := range result.Events { if event.Version > latestVersion { latestVersion = event.Version } @@ -162,13 +163,27 @@ func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, e return latestVersion, nil } -// GetEvents retrieves all events for an actor since a version +// GetEvents retrieves all events for an actor since a version. +// Note: This method silently skips malformed events for backward compatibility. +// Use GetEventsWithErrors to receive information about malformed events. func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { - return jes.getEventsInternal(actorID, fromVersion) + result, err := jes.getEventsWithErrorsInternal(actorID, fromVersion) + if err != nil { + return nil, err + } + return result.Events, nil } -// getEventsInternal is the internal implementation of GetEvents -func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion int64) ([]*aether.Event, error) { +// GetEventsWithErrors retrieves events for an actor and reports any malformed +// events encountered. This method allows callers to decide how to handle +// corrupted data rather than silently skipping it. +func (jes *JetStreamEventStore) GetEventsWithErrors(actorID string, fromVersion int64) (*aether.ReplayResult, error) { + return jes.getEventsWithErrorsInternal(actorID, fromVersion) +} + +// getEventsWithErrorsInternal is the internal implementation that tracks both +// successfully parsed events and errors for malformed events. +func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, fromVersion int64) (*aether.ReplayResult, error) { // Create subject filter for this actor subject := fmt.Sprintf("%s.events.%s.%s", jes.streamName, @@ -182,7 +197,10 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in } defer consumer.Unsubscribe() - var events []*aether.Event + result := &aether.ReplayResult{ + Events: make([]*aether.Event, 0), + Errors: make([]aether.ReplayError, 0), + } // Fetch messages in batches for { @@ -197,12 +215,24 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in for _, msg := range msgs { var event aether.Event if err := json.Unmarshal(msg.Data, &event); err != nil { - continue // Skip malformed events + // Record the error with context instead of silently skipping + metadata, _ := msg.Metadata() + seqNum := uint64(0) + if metadata != nil { + seqNum = metadata.Sequence.Stream + } + result.Errors = append(result.Errors, aether.ReplayError{ + SequenceNumber: seqNum, + RawData: msg.Data, + Err: err, + }) + msg.Ack() // Still ack to prevent redelivery + continue } // Filter by version if event.Version > fromVersion { - events = append(events, &event) + result.Events = append(result.Events, &event) } msg.Ack() @@ -213,7 +243,7 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in } } - return events, nil + return result, nil } // GetLatestVersion returns the latest version for an actor @@ -316,3 +346,6 @@ func sanitizeSubject(s string) string { s = strings.ReplaceAll(s, ">", "_") return s } + +// Compile-time check that JetStreamEventStore implements EventStoreWithErrors +var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)