Add ReplayError and ReplayResult types to capture information about malformed events encountered during replay. This allows callers to inspect and handle corrupted data rather than having it silently skipped. Key changes: - Add ReplayError type with sequence number, raw data, and underlying error - Add ReplayResult type containing both successfully parsed events and errors - Add EventStoreWithErrors interface for stores that can report replay errors - Implement GetEventsWithErrors on JetStreamEventStore - Update GetEvents to maintain backward compatibility (still skips malformed) - Add comprehensive unit tests for the new types This addresses the issue of silent data loss during event-sourced replay by giving callers visibility into data quality issues. Closes #39 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
228 lines
7.9 KiB
Go
228 lines
7.9 KiB
Go
package aether
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// ErrVersionConflict is returned when attempting to save an event with a version
|
|
// that is not strictly greater than the current latest version for an actor.
|
|
// This ensures events have monotonically increasing versions per actor.
|
|
var ErrVersionConflict = errors.New("version conflict")
|
|
|
|
// VersionConflictError provides details about a version conflict.
|
|
// It is returned when SaveEvent is called with a version <= the current latest version.
|
|
type VersionConflictError struct {
|
|
ActorID string
|
|
AttemptedVersion int64
|
|
CurrentVersion int64
|
|
}
|
|
|
|
func (e *VersionConflictError) Error() string {
|
|
return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d",
|
|
ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion)
|
|
}
|
|
|
|
func (e *VersionConflictError) Unwrap() error {
|
|
return ErrVersionConflict
|
|
}
|
|
|
|
// ReplayError captures information about a malformed event encountered during replay.
|
|
// This allows callers to inspect and handle corrupted data without losing context.
|
|
type ReplayError struct {
|
|
// SequenceNumber is the sequence number of the message in the stream (if available)
|
|
SequenceNumber uint64
|
|
// RawData is the raw bytes that could not be unmarshaled
|
|
RawData []byte
|
|
// Err is the underlying unmarshal error
|
|
Err error
|
|
}
|
|
|
|
func (e *ReplayError) Error() string {
|
|
return fmt.Sprintf("failed to unmarshal event at sequence %d: %v", e.SequenceNumber, e.Err)
|
|
}
|
|
|
|
func (e *ReplayError) Unwrap() error {
|
|
return e.Err
|
|
}
|
|
|
|
// ReplayResult contains the results of replaying events, including any errors encountered.
|
|
// This allows callers to decide how to handle malformed events rather than silently skipping them.
|
|
type ReplayResult struct {
|
|
// Events contains the successfully unmarshaled events
|
|
Events []*Event
|
|
// Errors contains information about any malformed events encountered
|
|
Errors []ReplayError
|
|
}
|
|
|
|
// HasErrors returns true if any malformed events were encountered during replay
|
|
func (r *ReplayResult) HasErrors() bool {
|
|
return len(r.Errors) > 0
|
|
}
|
|
|
|
// Event represents a domain event in the system
|
|
type Event struct {
|
|
ID string `json:"id"`
|
|
EventType string `json:"eventType"`
|
|
ActorID string `json:"actorId"`
|
|
CommandID string `json:"commandId,omitempty"` // Correlation ID for command that triggered this event
|
|
Version int64 `json:"version"`
|
|
Data map[string]interface{} `json:"data"`
|
|
Metadata map[string]string `json:"metadata,omitempty"` // Optional metadata for tracing and auditing
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// Common metadata keys for distributed tracing and auditing
|
|
const (
|
|
// MetadataKeyCorrelationID identifies related events across services
|
|
MetadataKeyCorrelationID = "correlationId"
|
|
// MetadataKeyCausationID identifies the event that caused this event
|
|
MetadataKeyCausationID = "causationId"
|
|
// MetadataKeyUserID identifies the user who triggered this event
|
|
MetadataKeyUserID = "userId"
|
|
// MetadataKeyTraceID for distributed tracing integration (e.g., OpenTelemetry)
|
|
MetadataKeyTraceID = "traceId"
|
|
// MetadataKeySpanID for distributed tracing integration
|
|
MetadataKeySpanID = "spanId"
|
|
)
|
|
|
|
// SetMetadata sets a metadata key-value pair, initializing the map if needed
|
|
func (e *Event) SetMetadata(key, value string) {
|
|
if e.Metadata == nil {
|
|
e.Metadata = make(map[string]string)
|
|
}
|
|
e.Metadata[key] = value
|
|
}
|
|
|
|
// GetMetadata returns the value for a metadata key, or empty string if not found
|
|
func (e *Event) GetMetadata(key string) string {
|
|
if e.Metadata == nil {
|
|
return ""
|
|
}
|
|
return e.Metadata[key]
|
|
}
|
|
|
|
// SetCorrelationID sets the correlation ID metadata
|
|
func (e *Event) SetCorrelationID(correlationID string) {
|
|
e.SetMetadata(MetadataKeyCorrelationID, correlationID)
|
|
}
|
|
|
|
// GetCorrelationID returns the correlation ID metadata
|
|
func (e *Event) GetCorrelationID() string {
|
|
return e.GetMetadata(MetadataKeyCorrelationID)
|
|
}
|
|
|
|
// SetCausationID sets the causation ID metadata
|
|
func (e *Event) SetCausationID(causationID string) {
|
|
e.SetMetadata(MetadataKeyCausationID, causationID)
|
|
}
|
|
|
|
// GetCausationID returns the causation ID metadata
|
|
func (e *Event) GetCausationID() string {
|
|
return e.GetMetadata(MetadataKeyCausationID)
|
|
}
|
|
|
|
// SetUserID sets the user ID metadata
|
|
func (e *Event) SetUserID(userID string) {
|
|
e.SetMetadata(MetadataKeyUserID, userID)
|
|
}
|
|
|
|
// GetUserID returns the user ID metadata
|
|
func (e *Event) GetUserID() string {
|
|
return e.GetMetadata(MetadataKeyUserID)
|
|
}
|
|
|
|
// SetTraceID sets the trace ID metadata for distributed tracing
|
|
func (e *Event) SetTraceID(traceID string) {
|
|
e.SetMetadata(MetadataKeyTraceID, traceID)
|
|
}
|
|
|
|
// GetTraceID returns the trace ID metadata
|
|
func (e *Event) GetTraceID() string {
|
|
return e.GetMetadata(MetadataKeyTraceID)
|
|
}
|
|
|
|
// SetSpanID sets the span ID metadata for distributed tracing
|
|
func (e *Event) SetSpanID(spanID string) {
|
|
e.SetMetadata(MetadataKeySpanID, spanID)
|
|
}
|
|
|
|
// GetSpanID returns the span ID metadata
|
|
func (e *Event) GetSpanID() string {
|
|
return e.GetMetadata(MetadataKeySpanID)
|
|
}
|
|
|
|
// WithMetadataFrom copies metadata from another event (useful for event chaining)
|
|
func (e *Event) WithMetadataFrom(source *Event) {
|
|
if source == nil || source.Metadata == nil {
|
|
return
|
|
}
|
|
if e.Metadata == nil {
|
|
e.Metadata = make(map[string]string)
|
|
}
|
|
for k, v := range source.Metadata {
|
|
e.Metadata[k] = v
|
|
}
|
|
}
|
|
|
|
// ActorSnapshot represents a point-in-time state snapshot
|
|
type ActorSnapshot struct {
|
|
ActorID string `json:"actorId"`
|
|
Version int64 `json:"version"`
|
|
State map[string]interface{} `json:"state"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// EventStore defines the interface for event persistence.
|
|
//
|
|
// # Version Semantics
|
|
//
|
|
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
|
// is called, the implementation must validate that the event's version is strictly
|
|
// greater than the current latest version for that actor. If the version is less
|
|
// than or equal to the current version, SaveEvent must return a VersionConflictError
|
|
// (which wraps ErrVersionConflict).
|
|
//
|
|
// This validation ensures event stream integrity and enables optimistic concurrency
|
|
// control. Clients should:
|
|
// 1. Call GetLatestVersion to get the current version for an actor
|
|
// 2. Set the new event's version to currentVersion + 1
|
|
// 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won
|
|
// 4. On conflict, reload the latest version and retry if appropriate
|
|
//
|
|
// For new actors (no existing events), version 1 is expected for the first event.
|
|
type EventStore interface {
|
|
// SaveEvent persists an event to the store. The event's Version must be
|
|
// strictly greater than the current latest version for the actor.
|
|
// Returns VersionConflictError if version <= current latest version.
|
|
SaveEvent(event *Event) error
|
|
|
|
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
|
// Returns an empty slice if no events exist for the actor.
|
|
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
|
|
|
|
// GetLatestVersion returns the latest version for an actor.
|
|
// Returns 0 if no events exist for the actor.
|
|
GetLatestVersion(actorID string) (int64, error)
|
|
}
|
|
|
|
// EventStoreWithErrors extends EventStore with methods that report malformed events.
|
|
// Stores that may encounter corrupted data during replay (e.g., JetStream) should
|
|
// implement this interface to give callers visibility into data quality issues.
|
|
type EventStoreWithErrors interface {
|
|
EventStore
|
|
|
|
// GetEventsWithErrors retrieves events for an actor and reports any malformed
|
|
// events encountered. This method allows callers to decide how to handle
|
|
// corrupted data rather than silently skipping it.
|
|
GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error)
|
|
}
|
|
|
|
// SnapshotStore extends EventStore with snapshot capabilities
|
|
type SnapshotStore interface {
|
|
EventStore
|
|
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
|
|
SaveSnapshot(snapshot *ActorSnapshot) error
|
|
}
|