Document that EventStore interface has no Update/Delete methods, enforcing append-only semantics by design. Events are immutable once persisted. Changes: - Update EventStore interface documentation in event.go to explicitly state immutability guarantee and explain why Update/Delete methods are absent - Add detailed retention policy documentation to JetStreamConfig showing how MaxAge limits enforce automatic expiration without manual deletion - Document JetStreamEventStore's immutability guarantee with storage-level explanation of file-based storage and limits-based retention - Add comprehensive immutability tests verifying: - Events cannot be modified after persistence - No Update or Delete methods exist on EventStore interface - Versions are monotonically increasing - Events cannot be deleted through the API - Update README with detailed immutability section explaining: - Interface-level append-only guarantee - Storage-level immutability through JetStream configuration - Audit trail reliability - Pattern for handling corrections (append new event) Closes #60 Co-Authored-By: Claude Code <noreply@anthropic.com>
250 lines
8.9 KiB
Go
250 lines
8.9 KiB
Go
package aether
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// ErrVersionConflict is returned when attempting to save an event with a version
|
|
// that is not strictly greater than the current latest version for an actor.
|
|
// This ensures events have monotonically increasing versions per actor.
|
|
var ErrVersionConflict = errors.New("version conflict")
|
|
|
|
// VersionConflictError provides details about a version conflict.
|
|
// It is returned when SaveEvent is called with a version <= the current latest version.
|
|
type VersionConflictError struct {
|
|
ActorID string
|
|
AttemptedVersion int64
|
|
CurrentVersion int64
|
|
}
|
|
|
|
func (e *VersionConflictError) Error() string {
|
|
return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d",
|
|
ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion)
|
|
}
|
|
|
|
func (e *VersionConflictError) Unwrap() error {
|
|
return ErrVersionConflict
|
|
}
|
|
|
|
// ReplayError captures information about a malformed event encountered during replay.
|
|
// This allows callers to inspect and handle corrupted data without losing context.
|
|
type ReplayError struct {
|
|
// SequenceNumber is the sequence number of the message in the stream (if available)
|
|
SequenceNumber uint64
|
|
// RawData is the raw bytes that could not be unmarshaled
|
|
RawData []byte
|
|
// Err is the underlying unmarshal error
|
|
Err error
|
|
}
|
|
|
|
func (e *ReplayError) Error() string {
|
|
return fmt.Sprintf("failed to unmarshal event at sequence %d: %v", e.SequenceNumber, e.Err)
|
|
}
|
|
|
|
func (e *ReplayError) Unwrap() error {
|
|
return e.Err
|
|
}
|
|
|
|
// ReplayResult contains the results of replaying events, including any errors encountered.
|
|
// This allows callers to decide how to handle malformed events rather than silently skipping them.
|
|
type ReplayResult struct {
|
|
// Events contains the successfully unmarshaled events
|
|
Events []*Event
|
|
// Errors contains information about any malformed events encountered
|
|
Errors []ReplayError
|
|
}
|
|
|
|
// HasErrors returns true if any malformed events were encountered during replay
|
|
func (r *ReplayResult) HasErrors() bool {
|
|
return len(r.Errors) > 0
|
|
}
|
|
|
|
// Event represents a domain event in the system
|
|
type Event struct {
|
|
ID string `json:"id"`
|
|
EventType string `json:"eventType"`
|
|
ActorID string `json:"actorId"`
|
|
CommandID string `json:"commandId,omitempty"` // Correlation ID for command that triggered this event
|
|
Version int64 `json:"version"`
|
|
Data map[string]interface{} `json:"data"`
|
|
Metadata map[string]string `json:"metadata,omitempty"` // Optional metadata for tracing and auditing
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// Common event types for Aether infrastructure
|
|
const (
|
|
// EventTypeEventStored is an internal event published when an event is successfully persisted.
|
|
// This event allows observability components (metrics, projections, audit systems) to react
|
|
// to persisted events without coupling to application code.
|
|
EventTypeEventStored = "EventStored"
|
|
)
|
|
|
|
// Common metadata keys for distributed tracing and auditing
|
|
const (
|
|
// MetadataKeyCorrelationID identifies related events across services
|
|
MetadataKeyCorrelationID = "correlationId"
|
|
// MetadataKeyCausationID identifies the event that caused this event
|
|
MetadataKeyCausationID = "causationId"
|
|
// MetadataKeyUserID identifies the user who triggered this event
|
|
MetadataKeyUserID = "userId"
|
|
// MetadataKeyTraceID for distributed tracing integration (e.g., OpenTelemetry)
|
|
MetadataKeyTraceID = "traceId"
|
|
// MetadataKeySpanID for distributed tracing integration
|
|
MetadataKeySpanID = "spanId"
|
|
)
|
|
|
|
// SetMetadata sets a metadata key-value pair, initializing the map if needed
|
|
func (e *Event) SetMetadata(key, value string) {
|
|
if e.Metadata == nil {
|
|
e.Metadata = make(map[string]string)
|
|
}
|
|
e.Metadata[key] = value
|
|
}
|
|
|
|
// GetMetadata returns the value for a metadata key, or empty string if not found
|
|
func (e *Event) GetMetadata(key string) string {
|
|
if e.Metadata == nil {
|
|
return ""
|
|
}
|
|
return e.Metadata[key]
|
|
}
|
|
|
|
// SetCorrelationID sets the correlation ID metadata
|
|
func (e *Event) SetCorrelationID(correlationID string) {
|
|
e.SetMetadata(MetadataKeyCorrelationID, correlationID)
|
|
}
|
|
|
|
// GetCorrelationID returns the correlation ID metadata
|
|
func (e *Event) GetCorrelationID() string {
|
|
return e.GetMetadata(MetadataKeyCorrelationID)
|
|
}
|
|
|
|
// SetCausationID sets the causation ID metadata
|
|
func (e *Event) SetCausationID(causationID string) {
|
|
e.SetMetadata(MetadataKeyCausationID, causationID)
|
|
}
|
|
|
|
// GetCausationID returns the causation ID metadata
|
|
func (e *Event) GetCausationID() string {
|
|
return e.GetMetadata(MetadataKeyCausationID)
|
|
}
|
|
|
|
// SetUserID sets the user ID metadata
|
|
func (e *Event) SetUserID(userID string) {
|
|
e.SetMetadata(MetadataKeyUserID, userID)
|
|
}
|
|
|
|
// GetUserID returns the user ID metadata
|
|
func (e *Event) GetUserID() string {
|
|
return e.GetMetadata(MetadataKeyUserID)
|
|
}
|
|
|
|
// SetTraceID sets the trace ID metadata for distributed tracing
|
|
func (e *Event) SetTraceID(traceID string) {
|
|
e.SetMetadata(MetadataKeyTraceID, traceID)
|
|
}
|
|
|
|
// GetTraceID returns the trace ID metadata
|
|
func (e *Event) GetTraceID() string {
|
|
return e.GetMetadata(MetadataKeyTraceID)
|
|
}
|
|
|
|
// SetSpanID sets the span ID metadata for distributed tracing
|
|
func (e *Event) SetSpanID(spanID string) {
|
|
e.SetMetadata(MetadataKeySpanID, spanID)
|
|
}
|
|
|
|
// GetSpanID returns the span ID metadata
|
|
func (e *Event) GetSpanID() string {
|
|
return e.GetMetadata(MetadataKeySpanID)
|
|
}
|
|
|
|
// WithMetadataFrom copies metadata from another event (useful for event chaining)
|
|
func (e *Event) WithMetadataFrom(source *Event) {
|
|
if source == nil || source.Metadata == nil {
|
|
return
|
|
}
|
|
if e.Metadata == nil {
|
|
e.Metadata = make(map[string]string)
|
|
}
|
|
for k, v := range source.Metadata {
|
|
e.Metadata[k] = v
|
|
}
|
|
}
|
|
|
|
// ActorSnapshot represents a point-in-time state snapshot
|
|
type ActorSnapshot struct {
|
|
ActorID string `json:"actorId"`
|
|
Version int64 `json:"version"`
|
|
State map[string]interface{} `json:"state"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// EventStore defines the interface for event persistence.
|
|
//
|
|
// # Immutability Guarantee
|
|
//
|
|
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
|
|
// modified or deleted. The interface intentionally provides no Update or Delete methods.
|
|
// This ensures:
|
|
// - Events serve as an immutable audit trail
|
|
// - State can be safely derived by replaying events
|
|
// - Concurrent reads are always safe (events never change)
|
|
//
|
|
// To correct a mistake, append a new event that expresses the correction.
|
|
//
|
|
// # Version Semantics
|
|
//
|
|
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
|
// is called, the implementation must validate that the event's version is strictly
|
|
// greater than the current latest version for that actor. If the version is less
|
|
// than or equal to the current version, SaveEvent must return a VersionConflictError
|
|
// (which wraps ErrVersionConflict).
|
|
//
|
|
// This validation ensures event stream integrity and enables optimistic concurrency
|
|
// control. Clients should:
|
|
// 1. Call GetLatestVersion to get the current version for an actor
|
|
// 2. Set the new event's version to currentVersion + 1
|
|
// 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won
|
|
// 4. On conflict, reload the latest version and retry if appropriate
|
|
//
|
|
// For new actors (no existing events), version 1 is expected for the first event.
|
|
type EventStore interface {
|
|
// SaveEvent persists an event to the store. The event's Version must be
|
|
// strictly greater than the current latest version for the actor.
|
|
// Returns VersionConflictError if version <= current latest version.
|
|
// Once saved, the event is immutable and can never be modified or deleted.
|
|
SaveEvent(event *Event) error
|
|
|
|
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
|
// Returns an empty slice if no events exist for the actor.
|
|
// The returned events are guaranteed to be immutable - they will never be
|
|
// modified or deleted from the store.
|
|
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
|
|
|
|
// GetLatestVersion returns the latest version for an actor.
|
|
// Returns 0 if no events exist for the actor.
|
|
GetLatestVersion(actorID string) (int64, error)
|
|
}
|
|
|
|
// EventStoreWithErrors extends EventStore with methods that report malformed events.
|
|
// Stores that may encounter corrupted data during replay (e.g., JetStream) should
|
|
// implement this interface to give callers visibility into data quality issues.
|
|
type EventStoreWithErrors interface {
|
|
EventStore
|
|
|
|
// GetEventsWithErrors retrieves events for an actor and reports any malformed
|
|
// events encountered. This method allows callers to decide how to handle
|
|
// corrupted data rather than silently skipping it.
|
|
GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error)
|
|
}
|
|
|
|
// SnapshotStore extends EventStore with snapshot capabilities
|
|
type SnapshotStore interface {
|
|
EventStore
|
|
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
|
|
SaveSnapshot(snapshot *ActorSnapshot) error
|
|
}
|