Merge pull request '[Issue #39] Handle malformed events during JetStream replay with proper error reporting' (#41) from issue-39-malformed-events into main
All checks were successful
CI / build (push) Successful in 16s

This commit was merged in pull request #41.
This commit is contained in:
2026-01-10 17:48:05 +00:00
3 changed files with 218 additions and 13 deletions

View File

@@ -28,6 +28,39 @@ func (e *VersionConflictError) Unwrap() error {
return ErrVersionConflict
}
// ReplayError captures information about a malformed event encountered during replay.
// This allows callers to inspect and handle corrupted data without losing context.
type ReplayError struct {
// SequenceNumber is the sequence number of the message in the stream (if available)
SequenceNumber uint64
// RawData is the raw bytes that could not be unmarshaled
RawData []byte
// Err is the underlying unmarshal error
Err error
}
func (e *ReplayError) Error() string {
return fmt.Sprintf("failed to unmarshal event at sequence %d: %v", e.SequenceNumber, e.Err)
}
func (e *ReplayError) Unwrap() error {
return e.Err
}
// ReplayResult contains the results of replaying events, including any errors encountered.
// This allows callers to decide how to handle malformed events rather than silently skipping them.
type ReplayResult struct {
// Events contains the successfully unmarshaled events
Events []*Event
// Errors contains information about any malformed events encountered
Errors []ReplayError
}
// HasErrors returns true if any malformed events were encountered during replay
func (r *ReplayResult) HasErrors() bool {
return len(r.Errors) > 0
}
// Event represents a domain event in the system
type Event struct {
ID string `json:"id"`
@@ -174,6 +207,18 @@ type EventStore interface {
GetLatestVersion(actorID string) (int64, error)
}
// EventStoreWithErrors extends EventStore with methods that report malformed events.
// Stores that may encounter corrupted data during replay (e.g., JetStream) should
// implement this interface to give callers visibility into data quality issues.
type EventStoreWithErrors interface {
EventStore
// GetEventsWithErrors retrieves events for an actor and reports any malformed
// events encountered. This method allows callers to decide how to handle
// corrupted data rather than silently skipping it.
GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error)
}
// SnapshotStore extends EventStore with snapshot capabilities
type SnapshotStore interface {
EventStore

View File

@@ -1208,3 +1208,130 @@ func TestEvent_MetadataAllHelpersRoundTrip(t *testing.T) {
t.Errorf("GetSpanID mismatch: got %q", decoded.GetSpanID())
}
}
// Tests for ReplayError and ReplayResult types
func TestReplayError_Error(t *testing.T) {
err := &ReplayError{
SequenceNumber: 42,
RawData: []byte(`invalid json`),
Err: json.Unmarshal([]byte(`{`), &struct{}{}),
}
errMsg := err.Error()
if !strings.Contains(errMsg, "42") {
t.Errorf("expected error message to contain sequence number, got: %s", errMsg)
}
if !strings.Contains(errMsg, "unmarshal") || !strings.Contains(errMsg, "failed") {
t.Errorf("expected error message to contain 'failed' and 'unmarshal', got: %s", errMsg)
}
}
func TestReplayError_Unwrap(t *testing.T) {
innerErr := json.Unmarshal([]byte(`{`), &struct{}{})
err := &ReplayError{
SequenceNumber: 1,
RawData: []byte(`{`),
Err: innerErr,
}
unwrapped := err.Unwrap()
if unwrapped != innerErr {
t.Errorf("expected Unwrap to return inner error")
}
}
func TestReplayResult_HasErrors(t *testing.T) {
tests := []struct {
name string
result *ReplayResult
expected bool
}{
{
name: "no errors",
result: &ReplayResult{Events: []*Event{}, Errors: []ReplayError{}},
expected: false,
},
{
name: "nil errors slice",
result: &ReplayResult{Events: []*Event{}, Errors: nil},
expected: false,
},
{
name: "has errors",
result: &ReplayResult{
Events: []*Event{},
Errors: []ReplayError{
{SequenceNumber: 1, RawData: []byte(`bad`), Err: nil},
},
},
expected: true,
},
{
name: "has events and errors",
result: &ReplayResult{
Events: []*Event{{ID: "evt-1"}},
Errors: []ReplayError{
{SequenceNumber: 2, RawData: []byte(`bad`), Err: nil},
},
},
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.result.HasErrors(); got != tt.expected {
t.Errorf("HasErrors() = %v, want %v", got, tt.expected)
}
})
}
}
func TestReplayResult_EmptyResult(t *testing.T) {
result := &ReplayResult{
Events: []*Event{},
Errors: []ReplayError{},
}
if result.HasErrors() {
t.Error("expected HasErrors() to return false for empty result")
}
if len(result.Events) != 0 {
t.Errorf("expected 0 events, got %d", len(result.Events))
}
}
func TestReplayError_WithZeroSequence(t *testing.T) {
err := &ReplayError{
SequenceNumber: 0,
RawData: []byte(`corrupted`),
Err: json.Unmarshal([]byte(`not-json`), &struct{}{}),
}
errMsg := err.Error()
if !strings.Contains(errMsg, "sequence 0") {
t.Errorf("expected error message to contain 'sequence 0', got: %s", errMsg)
}
}
func TestReplayError_WithLargeRawData(t *testing.T) {
largeData := make([]byte, 1024*1024) // 1MB
for i := range largeData {
largeData[i] = 'x'
}
err := &ReplayError{
SequenceNumber: 999,
RawData: largeData,
Err: json.Unmarshal(largeData, &struct{}{}),
}
// Should be able to create the error without issues
if len(err.RawData) != 1024*1024 {
t.Errorf("expected RawData to be preserved, got length %d", len(err.RawData))
}
// Error() should still work
_ = err.Error()
}

View File

@@ -33,7 +33,8 @@ func DefaultJetStreamConfig() JetStreamConfig {
}
}
// JetStreamEventStore implements EventStore using NATS JetStream for persistence
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
// It also implements EventStoreWithErrors to report malformed events during replay.
type JetStreamEventStore struct {
js nats.JetStreamContext
streamName string
@@ -139,18 +140,18 @@ func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, e
return version, nil
}
// Fetch from JetStream
events, err := jes.getEventsInternal(actorID, 0)
// Fetch from JetStream - use internal method that returns result
result, err := jes.getEventsWithErrorsInternal(actorID, 0)
if err != nil {
return 0, err
}
if len(events) == 0 {
if len(result.Events) == 0 {
return 0, nil
}
latestVersion := int64(0)
for _, event := range events {
for _, event := range result.Events {
if event.Version > latestVersion {
latestVersion = event.Version
}
@@ -162,13 +163,27 @@ func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, e
return latestVersion, nil
}
// GetEvents retrieves all events for an actor since a version
// GetEvents retrieves all events for an actor since a version.
// Note: This method silently skips malformed events for backward compatibility.
// Use GetEventsWithErrors to receive information about malformed events.
func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
return jes.getEventsInternal(actorID, fromVersion)
result, err := jes.getEventsWithErrorsInternal(actorID, fromVersion)
if err != nil {
return nil, err
}
return result.Events, nil
}
// getEventsInternal is the internal implementation of GetEvents
func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion int64) ([]*aether.Event, error) {
// GetEventsWithErrors retrieves events for an actor and reports any malformed
// events encountered. This method allows callers to decide how to handle
// corrupted data rather than silently skipping it.
func (jes *JetStreamEventStore) GetEventsWithErrors(actorID string, fromVersion int64) (*aether.ReplayResult, error) {
return jes.getEventsWithErrorsInternal(actorID, fromVersion)
}
// getEventsWithErrorsInternal is the internal implementation that tracks both
// successfully parsed events and errors for malformed events.
func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, fromVersion int64) (*aether.ReplayResult, error) {
// Create subject filter for this actor
subject := fmt.Sprintf("%s.events.%s.%s",
jes.streamName,
@@ -182,7 +197,10 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in
}
defer consumer.Unsubscribe()
var events []*aether.Event
result := &aether.ReplayResult{
Events: make([]*aether.Event, 0),
Errors: make([]aether.ReplayError, 0),
}
// Fetch messages in batches
for {
@@ -197,12 +215,24 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in
for _, msg := range msgs {
var event aether.Event
if err := json.Unmarshal(msg.Data, &event); err != nil {
continue // Skip malformed events
// Record the error with context instead of silently skipping
metadata, _ := msg.Metadata()
seqNum := uint64(0)
if metadata != nil {
seqNum = metadata.Sequence.Stream
}
result.Errors = append(result.Errors, aether.ReplayError{
SequenceNumber: seqNum,
RawData: msg.Data,
Err: err,
})
msg.Ack() // Still ack to prevent redelivery
continue
}
// Filter by version
if event.Version > fromVersion {
events = append(events, &event)
result.Events = append(result.Events, &event)
}
msg.Ack()
@@ -213,7 +243,7 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in
}
}
return events, nil
return result, nil
}
// GetLatestVersion returns the latest version for an actor
@@ -316,3 +346,6 @@ func sanitizeSubject(s string) string {
s = strings.ReplaceAll(s, ">", "_")
return s
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)