package aether import ( "errors" "fmt" "time" ) // ErrVersionConflict is returned when attempting to save an event with a version // that is not strictly greater than the current latest version for an actor. // This ensures events have monotonically increasing versions per actor. var ErrVersionConflict = errors.New("version conflict") // VersionConflictError provides details about a version conflict. // It is returned when SaveEvent is called with a version <= the current latest version. type VersionConflictError struct { ActorID string AttemptedVersion int64 CurrentVersion int64 } func (e *VersionConflictError) Error() string { return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d", ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion) } func (e *VersionConflictError) Unwrap() error { return ErrVersionConflict } // ReplayError captures information about a malformed event encountered during replay. // This allows callers to inspect and handle corrupted data without losing context. type ReplayError struct { // SequenceNumber is the sequence number of the message in the stream (if available) SequenceNumber uint64 // RawData is the raw bytes that could not be unmarshaled RawData []byte // Err is the underlying unmarshal error Err error } func (e *ReplayError) Error() string { return fmt.Sprintf("failed to unmarshal event at sequence %d: %v", e.SequenceNumber, e.Err) } func (e *ReplayError) Unwrap() error { return e.Err } // ReplayResult contains the results of replaying events, including any errors encountered. // This allows callers to decide how to handle malformed events rather than silently skipping them. type ReplayResult struct { // Events contains the successfully unmarshaled events Events []*Event // Errors contains information about any malformed events encountered Errors []ReplayError } // HasErrors returns true if any malformed events were encountered during replay func (r *ReplayResult) HasErrors() bool { return len(r.Errors) > 0 } // Event represents a domain event in the system type Event struct { ID string `json:"id"` EventType string `json:"eventType"` ActorID string `json:"actorId"` CommandID string `json:"commandId,omitempty"` // Correlation ID for command that triggered this event Version int64 `json:"version"` Data map[string]interface{} `json:"data"` Metadata map[string]string `json:"metadata,omitempty"` // Optional metadata for tracing and auditing Timestamp time.Time `json:"timestamp"` } // Common event types for Aether infrastructure const ( // EventTypeEventStored is an internal event published when an event is successfully persisted. // This event allows observability components (metrics, projections, audit systems) to react // to persisted events without coupling to application code. EventTypeEventStored = "EventStored" ) // Common metadata keys for distributed tracing and auditing const ( // MetadataKeyCorrelationID identifies related events across services MetadataKeyCorrelationID = "correlationId" // MetadataKeyCausationID identifies the event that caused this event MetadataKeyCausationID = "causationId" // MetadataKeyUserID identifies the user who triggered this event MetadataKeyUserID = "userId" // MetadataKeyTraceID for distributed tracing integration (e.g., OpenTelemetry) MetadataKeyTraceID = "traceId" // MetadataKeySpanID for distributed tracing integration MetadataKeySpanID = "spanId" ) // SetMetadata sets a metadata key-value pair, initializing the map if needed func (e *Event) SetMetadata(key, value string) { if e.Metadata == nil { e.Metadata = make(map[string]string) } e.Metadata[key] = value } // GetMetadata returns the value for a metadata key, or empty string if not found func (e *Event) GetMetadata(key string) string { if e.Metadata == nil { return "" } return e.Metadata[key] } // SetCorrelationID sets the correlation ID metadata func (e *Event) SetCorrelationID(correlationID string) { e.SetMetadata(MetadataKeyCorrelationID, correlationID) } // GetCorrelationID returns the correlation ID metadata func (e *Event) GetCorrelationID() string { return e.GetMetadata(MetadataKeyCorrelationID) } // SetCausationID sets the causation ID metadata func (e *Event) SetCausationID(causationID string) { e.SetMetadata(MetadataKeyCausationID, causationID) } // GetCausationID returns the causation ID metadata func (e *Event) GetCausationID() string { return e.GetMetadata(MetadataKeyCausationID) } // SetUserID sets the user ID metadata func (e *Event) SetUserID(userID string) { e.SetMetadata(MetadataKeyUserID, userID) } // GetUserID returns the user ID metadata func (e *Event) GetUserID() string { return e.GetMetadata(MetadataKeyUserID) } // SetTraceID sets the trace ID metadata for distributed tracing func (e *Event) SetTraceID(traceID string) { e.SetMetadata(MetadataKeyTraceID, traceID) } // GetTraceID returns the trace ID metadata func (e *Event) GetTraceID() string { return e.GetMetadata(MetadataKeyTraceID) } // SetSpanID sets the span ID metadata for distributed tracing func (e *Event) SetSpanID(spanID string) { e.SetMetadata(MetadataKeySpanID, spanID) } // GetSpanID returns the span ID metadata func (e *Event) GetSpanID() string { return e.GetMetadata(MetadataKeySpanID) } // WithMetadataFrom copies metadata from another event (useful for event chaining) func (e *Event) WithMetadataFrom(source *Event) { if source == nil || source.Metadata == nil { return } if e.Metadata == nil { e.Metadata = make(map[string]string) } for k, v := range source.Metadata { e.Metadata[k] = v } } // ActorSnapshot represents a point-in-time state snapshot type ActorSnapshot struct { ActorID string `json:"actorId"` Version int64 `json:"version"` State map[string]interface{} `json:"state"` Timestamp time.Time `json:"timestamp"` } // EventStore defines the interface for event persistence. // // # Immutability Guarantee // // EventStore is append-only. Once an event is persisted via SaveEvent, it is never // modified or deleted. The interface intentionally provides no Update or Delete methods. // This ensures: // - Events serve as an immutable audit trail // - State can be safely derived by replaying events // - Concurrent reads are always safe (events never change) // // To correct a mistake, append a new event that expresses the correction. // // # Version Semantics // // Events for an actor must have monotonically increasing versions. When SaveEvent // is called, the implementation must validate that the event's version is strictly // greater than the current latest version for that actor. If the version is less // than or equal to the current version, SaveEvent must return a VersionConflictError // (which wraps ErrVersionConflict). // // This validation ensures event stream integrity and enables optimistic concurrency // control. Clients should: // 1. Call GetLatestVersion to get the current version for an actor // 2. Set the new event's version to currentVersion + 1 // 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won // 4. On conflict, reload the latest version and retry if appropriate // // For new actors (no existing events), version 1 is expected for the first event. type EventStore interface { // SaveEvent persists an event to the store. The event's Version must be // strictly greater than the current latest version for the actor. // Returns VersionConflictError if version <= current latest version. // Once saved, the event is immutable and can never be modified or deleted. SaveEvent(event *Event) error // GetEvents retrieves events for an actor from a specific version (inclusive). // Returns an empty slice if no events exist for the actor. // The returned events are guaranteed to be immutable - they will never be // modified or deleted from the store. GetEvents(actorID string, fromVersion int64) ([]*Event, error) // GetLatestVersion returns the latest version for an actor. // Returns 0 if no events exist for the actor. GetLatestVersion(actorID string) (int64, error) } // EventStoreWithErrors extends EventStore with methods that report malformed events. // Stores that may encounter corrupted data during replay (e.g., JetStream) should // implement this interface to give callers visibility into data quality issues. type EventStoreWithErrors interface { EventStore // GetEventsWithErrors retrieves events for an actor and reports any malformed // events encountered. This method allows callers to decide how to handle // corrupted data rather than silently skipping it. GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error) } // SnapshotStore extends EventStore with snapshot capabilities type SnapshotStore interface { EventStore GetLatestSnapshot(actorID string) (*ActorSnapshot, error) SaveSnapshot(snapshot *ActorSnapshot) error }