[Issue #39] Handle malformed events during JetStream replay with proper error reporting #41
45
event.go
45
event.go
@@ -28,6 +28,39 @@ func (e *VersionConflictError) Unwrap() error {
|
|||||||
return ErrVersionConflict
|
return ErrVersionConflict
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReplayError captures information about a malformed event encountered during replay.
|
||||||
|
// This allows callers to inspect and handle corrupted data without losing context.
|
||||||
|
type ReplayError struct {
|
||||||
|
// SequenceNumber is the sequence number of the message in the stream (if available)
|
||||||
|
SequenceNumber uint64
|
||||||
|
// RawData is the raw bytes that could not be unmarshaled
|
||||||
|
RawData []byte
|
||||||
|
// Err is the underlying unmarshal error
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ReplayError) Error() string {
|
||||||
|
return fmt.Sprintf("failed to unmarshal event at sequence %d: %v", e.SequenceNumber, e.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ReplayError) Unwrap() error {
|
||||||
|
return e.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplayResult contains the results of replaying events, including any errors encountered.
|
||||||
|
// This allows callers to decide how to handle malformed events rather than silently skipping them.
|
||||||
|
type ReplayResult struct {
|
||||||
|
// Events contains the successfully unmarshaled events
|
||||||
|
Events []*Event
|
||||||
|
// Errors contains information about any malformed events encountered
|
||||||
|
Errors []ReplayError
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasErrors returns true if any malformed events were encountered during replay
|
||||||
|
func (r *ReplayResult) HasErrors() bool {
|
||||||
|
return len(r.Errors) > 0
|
||||||
|
}
|
||||||
|
|
||||||
// Event represents a domain event in the system
|
// Event represents a domain event in the system
|
||||||
type Event struct {
|
type Event struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
@@ -174,6 +207,18 @@ type EventStore interface {
|
|||||||
GetLatestVersion(actorID string) (int64, error)
|
GetLatestVersion(actorID string) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EventStoreWithErrors extends EventStore with methods that report malformed events.
|
||||||
|
// Stores that may encounter corrupted data during replay (e.g., JetStream) should
|
||||||
|
// implement this interface to give callers visibility into data quality issues.
|
||||||
|
type EventStoreWithErrors interface {
|
||||||
|
EventStore
|
||||||
|
|
||||||
|
// GetEventsWithErrors retrieves events for an actor and reports any malformed
|
||||||
|
// events encountered. This method allows callers to decide how to handle
|
||||||
|
// corrupted data rather than silently skipping it.
|
||||||
|
GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error)
|
||||||
|
}
|
||||||
|
|
||||||
// SnapshotStore extends EventStore with snapshot capabilities
|
// SnapshotStore extends EventStore with snapshot capabilities
|
||||||
type SnapshotStore interface {
|
type SnapshotStore interface {
|
||||||
EventStore
|
EventStore
|
||||||
|
|||||||
127
event_test.go
127
event_test.go
@@ -1208,3 +1208,130 @@ func TestEvent_MetadataAllHelpersRoundTrip(t *testing.T) {
|
|||||||
t.Errorf("GetSpanID mismatch: got %q", decoded.GetSpanID())
|
t.Errorf("GetSpanID mismatch: got %q", decoded.GetSpanID())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests for ReplayError and ReplayResult types
|
||||||
|
|
||||||
|
func TestReplayError_Error(t *testing.T) {
|
||||||
|
err := &ReplayError{
|
||||||
|
SequenceNumber: 42,
|
||||||
|
RawData: []byte(`invalid json`),
|
||||||
|
Err: json.Unmarshal([]byte(`{`), &struct{}{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
errMsg := err.Error()
|
||||||
|
if !strings.Contains(errMsg, "42") {
|
||||||
|
t.Errorf("expected error message to contain sequence number, got: %s", errMsg)
|
||||||
|
}
|
||||||
|
if !strings.Contains(errMsg, "unmarshal") || !strings.Contains(errMsg, "failed") {
|
||||||
|
t.Errorf("expected error message to contain 'failed' and 'unmarshal', got: %s", errMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplayError_Unwrap(t *testing.T) {
|
||||||
|
innerErr := json.Unmarshal([]byte(`{`), &struct{}{})
|
||||||
|
err := &ReplayError{
|
||||||
|
SequenceNumber: 1,
|
||||||
|
RawData: []byte(`{`),
|
||||||
|
Err: innerErr,
|
||||||
|
}
|
||||||
|
|
||||||
|
unwrapped := err.Unwrap()
|
||||||
|
if unwrapped != innerErr {
|
||||||
|
t.Errorf("expected Unwrap to return inner error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplayResult_HasErrors(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
result *ReplayResult
|
||||||
|
expected bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no errors",
|
||||||
|
result: &ReplayResult{Events: []*Event{}, Errors: []ReplayError{}},
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "nil errors slice",
|
||||||
|
result: &ReplayResult{Events: []*Event{}, Errors: nil},
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "has errors",
|
||||||
|
result: &ReplayResult{
|
||||||
|
Events: []*Event{},
|
||||||
|
Errors: []ReplayError{
|
||||||
|
{SequenceNumber: 1, RawData: []byte(`bad`), Err: nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "has events and errors",
|
||||||
|
result: &ReplayResult{
|
||||||
|
Events: []*Event{{ID: "evt-1"}},
|
||||||
|
Errors: []ReplayError{
|
||||||
|
{SequenceNumber: 2, RawData: []byte(`bad`), Err: nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
if got := tt.result.HasErrors(); got != tt.expected {
|
||||||
|
t.Errorf("HasErrors() = %v, want %v", got, tt.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplayResult_EmptyResult(t *testing.T) {
|
||||||
|
result := &ReplayResult{
|
||||||
|
Events: []*Event{},
|
||||||
|
Errors: []ReplayError{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.HasErrors() {
|
||||||
|
t.Error("expected HasErrors() to return false for empty result")
|
||||||
|
}
|
||||||
|
if len(result.Events) != 0 {
|
||||||
|
t.Errorf("expected 0 events, got %d", len(result.Events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplayError_WithZeroSequence(t *testing.T) {
|
||||||
|
err := &ReplayError{
|
||||||
|
SequenceNumber: 0,
|
||||||
|
RawData: []byte(`corrupted`),
|
||||||
|
Err: json.Unmarshal([]byte(`not-json`), &struct{}{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
errMsg := err.Error()
|
||||||
|
if !strings.Contains(errMsg, "sequence 0") {
|
||||||
|
t.Errorf("expected error message to contain 'sequence 0', got: %s", errMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplayError_WithLargeRawData(t *testing.T) {
|
||||||
|
largeData := make([]byte, 1024*1024) // 1MB
|
||||||
|
for i := range largeData {
|
||||||
|
largeData[i] = 'x'
|
||||||
|
}
|
||||||
|
|
||||||
|
err := &ReplayError{
|
||||||
|
SequenceNumber: 999,
|
||||||
|
RawData: largeData,
|
||||||
|
Err: json.Unmarshal(largeData, &struct{}{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be able to create the error without issues
|
||||||
|
if len(err.RawData) != 1024*1024 {
|
||||||
|
t.Errorf("expected RawData to be preserved, got length %d", len(err.RawData))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error() should still work
|
||||||
|
_ = err.Error()
|
||||||
|
}
|
||||||
|
|||||||
@@ -33,7 +33,8 @@ func DefaultJetStreamConfig() JetStreamConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// JetStreamEventStore implements EventStore using NATS JetStream for persistence
|
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
||||||
|
// It also implements EventStoreWithErrors to report malformed events during replay.
|
||||||
type JetStreamEventStore struct {
|
type JetStreamEventStore struct {
|
||||||
js nats.JetStreamContext
|
js nats.JetStreamContext
|
||||||
streamName string
|
streamName string
|
||||||
@@ -139,18 +140,18 @@ func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, e
|
|||||||
return version, nil
|
return version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch from JetStream
|
// Fetch from JetStream - use internal method that returns result
|
||||||
events, err := jes.getEventsInternal(actorID, 0)
|
result, err := jes.getEventsWithErrorsInternal(actorID, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(events) == 0 {
|
if len(result.Events) == 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
latestVersion := int64(0)
|
latestVersion := int64(0)
|
||||||
for _, event := range events {
|
for _, event := range result.Events {
|
||||||
if event.Version > latestVersion {
|
if event.Version > latestVersion {
|
||||||
latestVersion = event.Version
|
latestVersion = event.Version
|
||||||
}
|
}
|
||||||
@@ -162,13 +163,27 @@ func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, e
|
|||||||
return latestVersion, nil
|
return latestVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEvents retrieves all events for an actor since a version
|
// GetEvents retrieves all events for an actor since a version.
|
||||||
|
// Note: This method silently skips malformed events for backward compatibility.
|
||||||
|
// Use GetEventsWithErrors to receive information about malformed events.
|
||||||
func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||||
return jes.getEventsInternal(actorID, fromVersion)
|
result, err := jes.getEventsWithErrorsInternal(actorID, fromVersion)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result.Events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getEventsInternal is the internal implementation of GetEvents
|
// GetEventsWithErrors retrieves events for an actor and reports any malformed
|
||||||
func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
// events encountered. This method allows callers to decide how to handle
|
||||||
|
// corrupted data rather than silently skipping it.
|
||||||
|
func (jes *JetStreamEventStore) GetEventsWithErrors(actorID string, fromVersion int64) (*aether.ReplayResult, error) {
|
||||||
|
return jes.getEventsWithErrorsInternal(actorID, fromVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getEventsWithErrorsInternal is the internal implementation that tracks both
|
||||||
|
// successfully parsed events and errors for malformed events.
|
||||||
|
func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, fromVersion int64) (*aether.ReplayResult, error) {
|
||||||
// Create subject filter for this actor
|
// Create subject filter for this actor
|
||||||
subject := fmt.Sprintf("%s.events.%s.%s",
|
subject := fmt.Sprintf("%s.events.%s.%s",
|
||||||
jes.streamName,
|
jes.streamName,
|
||||||
@@ -182,7 +197,10 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in
|
|||||||
}
|
}
|
||||||
defer consumer.Unsubscribe()
|
defer consumer.Unsubscribe()
|
||||||
|
|
||||||
var events []*aether.Event
|
result := &aether.ReplayResult{
|
||||||
|
Events: make([]*aether.Event, 0),
|
||||||
|
Errors: make([]aether.ReplayError, 0),
|
||||||
|
}
|
||||||
|
|
||||||
// Fetch messages in batches
|
// Fetch messages in batches
|
||||||
for {
|
for {
|
||||||
@@ -197,12 +215,24 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in
|
|||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
var event aether.Event
|
var event aether.Event
|
||||||
if err := json.Unmarshal(msg.Data, &event); err != nil {
|
if err := json.Unmarshal(msg.Data, &event); err != nil {
|
||||||
continue // Skip malformed events
|
// Record the error with context instead of silently skipping
|
||||||
|
metadata, _ := msg.Metadata()
|
||||||
|
seqNum := uint64(0)
|
||||||
|
if metadata != nil {
|
||||||
|
seqNum = metadata.Sequence.Stream
|
||||||
|
}
|
||||||
|
result.Errors = append(result.Errors, aether.ReplayError{
|
||||||
|
SequenceNumber: seqNum,
|
||||||
|
RawData: msg.Data,
|
||||||
|
Err: err,
|
||||||
|
})
|
||||||
|
msg.Ack() // Still ack to prevent redelivery
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter by version
|
// Filter by version
|
||||||
if event.Version > fromVersion {
|
if event.Version > fromVersion {
|
||||||
events = append(events, &event)
|
result.Events = append(result.Events, &event)
|
||||||
}
|
}
|
||||||
|
|
||||||
msg.Ack()
|
msg.Ack()
|
||||||
@@ -213,7 +243,7 @@ func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion in
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return events, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestVersion returns the latest version for an actor
|
// GetLatestVersion returns the latest version for an actor
|
||||||
@@ -316,3 +346,6 @@ func sanitizeSubject(s string) string {
|
|||||||
s = strings.ReplaceAll(s, ">", "_")
|
s = strings.ReplaceAll(s, ">", "_")
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
|
||||||
|
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
|
||||||
|
|||||||
Reference in New Issue
Block a user