Files
aether/event.go
Claude Code 6549125f3d
Some checks failed
CI / build (pull_request) Successful in 22s
CI / build (push) Successful in 21s
CI / integration (pull_request) Failing after 1m59s
CI / integration (push) Failing after 2m0s
docs: Verify and document append-only immutability guarantees
Document that EventStore interface has no Update/Delete methods, enforcing
append-only semantics by design. Events are immutable once persisted.

Changes:
- Update EventStore interface documentation in event.go to explicitly state
  immutability guarantee and explain why Update/Delete methods are absent
- Add detailed retention policy documentation to JetStreamConfig showing
  how MaxAge limits enforce automatic expiration without manual deletion
- Document JetStreamEventStore's immutability guarantee with storage-level
  explanation of file-based storage and limits-based retention
- Add comprehensive immutability tests verifying:
  - Events cannot be modified after persistence
  - No Update or Delete methods exist on EventStore interface
  - Versions are monotonically increasing
  - Events cannot be deleted through the API
- Update README with detailed immutability section explaining:
  - Interface-level append-only guarantee
  - Storage-level immutability through JetStream configuration
  - Audit trail reliability
  - Pattern for handling corrections (append new event)

Closes #60

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 21:39:45 +00:00

250 lines
8.9 KiB
Go

package aether
import (
"errors"
"fmt"
"time"
)
// ErrVersionConflict is returned when attempting to save an event with a version
// that is not strictly greater than the current latest version for an actor.
// This ensures events have monotonically increasing versions per actor.
var ErrVersionConflict = errors.New("version conflict")
// VersionConflictError provides details about a version conflict.
// It is returned when SaveEvent is called with a version <= the current latest version.
type VersionConflictError struct {
ActorID string
AttemptedVersion int64
CurrentVersion int64
}
func (e *VersionConflictError) Error() string {
return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d",
ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion)
}
func (e *VersionConflictError) Unwrap() error {
return ErrVersionConflict
}
// ReplayError captures information about a malformed event encountered during replay.
// This allows callers to inspect and handle corrupted data without losing context.
type ReplayError struct {
// SequenceNumber is the sequence number of the message in the stream (if available)
SequenceNumber uint64
// RawData is the raw bytes that could not be unmarshaled
RawData []byte
// Err is the underlying unmarshal error
Err error
}
func (e *ReplayError) Error() string {
return fmt.Sprintf("failed to unmarshal event at sequence %d: %v", e.SequenceNumber, e.Err)
}
func (e *ReplayError) Unwrap() error {
return e.Err
}
// ReplayResult contains the results of replaying events, including any errors encountered.
// This allows callers to decide how to handle malformed events rather than silently skipping them.
type ReplayResult struct {
// Events contains the successfully unmarshaled events
Events []*Event
// Errors contains information about any malformed events encountered
Errors []ReplayError
}
// HasErrors returns true if any malformed events were encountered during replay
func (r *ReplayResult) HasErrors() bool {
return len(r.Errors) > 0
}
// Event represents a domain event in the system
type Event struct {
ID string `json:"id"`
EventType string `json:"eventType"`
ActorID string `json:"actorId"`
CommandID string `json:"commandId,omitempty"` // Correlation ID for command that triggered this event
Version int64 `json:"version"`
Data map[string]interface{} `json:"data"`
Metadata map[string]string `json:"metadata,omitempty"` // Optional metadata for tracing and auditing
Timestamp time.Time `json:"timestamp"`
}
// Common event types for Aether infrastructure
const (
// EventTypeEventStored is an internal event published when an event is successfully persisted.
// This event allows observability components (metrics, projections, audit systems) to react
// to persisted events without coupling to application code.
EventTypeEventStored = "EventStored"
)
// Common metadata keys for distributed tracing and auditing
const (
// MetadataKeyCorrelationID identifies related events across services
MetadataKeyCorrelationID = "correlationId"
// MetadataKeyCausationID identifies the event that caused this event
MetadataKeyCausationID = "causationId"
// MetadataKeyUserID identifies the user who triggered this event
MetadataKeyUserID = "userId"
// MetadataKeyTraceID for distributed tracing integration (e.g., OpenTelemetry)
MetadataKeyTraceID = "traceId"
// MetadataKeySpanID for distributed tracing integration
MetadataKeySpanID = "spanId"
)
// SetMetadata sets a metadata key-value pair, initializing the map if needed
func (e *Event) SetMetadata(key, value string) {
if e.Metadata == nil {
e.Metadata = make(map[string]string)
}
e.Metadata[key] = value
}
// GetMetadata returns the value for a metadata key, or empty string if not found
func (e *Event) GetMetadata(key string) string {
if e.Metadata == nil {
return ""
}
return e.Metadata[key]
}
// SetCorrelationID sets the correlation ID metadata
func (e *Event) SetCorrelationID(correlationID string) {
e.SetMetadata(MetadataKeyCorrelationID, correlationID)
}
// GetCorrelationID returns the correlation ID metadata
func (e *Event) GetCorrelationID() string {
return e.GetMetadata(MetadataKeyCorrelationID)
}
// SetCausationID sets the causation ID metadata
func (e *Event) SetCausationID(causationID string) {
e.SetMetadata(MetadataKeyCausationID, causationID)
}
// GetCausationID returns the causation ID metadata
func (e *Event) GetCausationID() string {
return e.GetMetadata(MetadataKeyCausationID)
}
// SetUserID sets the user ID metadata
func (e *Event) SetUserID(userID string) {
e.SetMetadata(MetadataKeyUserID, userID)
}
// GetUserID returns the user ID metadata
func (e *Event) GetUserID() string {
return e.GetMetadata(MetadataKeyUserID)
}
// SetTraceID sets the trace ID metadata for distributed tracing
func (e *Event) SetTraceID(traceID string) {
e.SetMetadata(MetadataKeyTraceID, traceID)
}
// GetTraceID returns the trace ID metadata
func (e *Event) GetTraceID() string {
return e.GetMetadata(MetadataKeyTraceID)
}
// SetSpanID sets the span ID metadata for distributed tracing
func (e *Event) SetSpanID(spanID string) {
e.SetMetadata(MetadataKeySpanID, spanID)
}
// GetSpanID returns the span ID metadata
func (e *Event) GetSpanID() string {
return e.GetMetadata(MetadataKeySpanID)
}
// WithMetadataFrom copies metadata from another event (useful for event chaining)
func (e *Event) WithMetadataFrom(source *Event) {
if source == nil || source.Metadata == nil {
return
}
if e.Metadata == nil {
e.Metadata = make(map[string]string)
}
for k, v := range source.Metadata {
e.Metadata[k] = v
}
}
// ActorSnapshot represents a point-in-time state snapshot
type ActorSnapshot struct {
ActorID string `json:"actorId"`
Version int64 `json:"version"`
State map[string]interface{} `json:"state"`
Timestamp time.Time `json:"timestamp"`
}
// EventStore defines the interface for event persistence.
//
// # Immutability Guarantee
//
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
// modified or deleted. The interface intentionally provides no Update or Delete methods.
// This ensures:
// - Events serve as an immutable audit trail
// - State can be safely derived by replaying events
// - Concurrent reads are always safe (events never change)
//
// To correct a mistake, append a new event that expresses the correction.
//
// # Version Semantics
//
// Events for an actor must have monotonically increasing versions. When SaveEvent
// is called, the implementation must validate that the event's version is strictly
// greater than the current latest version for that actor. If the version is less
// than or equal to the current version, SaveEvent must return a VersionConflictError
// (which wraps ErrVersionConflict).
//
// This validation ensures event stream integrity and enables optimistic concurrency
// control. Clients should:
// 1. Call GetLatestVersion to get the current version for an actor
// 2. Set the new event's version to currentVersion + 1
// 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won
// 4. On conflict, reload the latest version and retry if appropriate
//
// For new actors (no existing events), version 1 is expected for the first event.
type EventStore interface {
// SaveEvent persists an event to the store. The event's Version must be
// strictly greater than the current latest version for the actor.
// Returns VersionConflictError if version <= current latest version.
// Once saved, the event is immutable and can never be modified or deleted.
SaveEvent(event *Event) error
// GetEvents retrieves events for an actor from a specific version (inclusive).
// Returns an empty slice if no events exist for the actor.
// The returned events are guaranteed to be immutable - they will never be
// modified or deleted from the store.
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
// GetLatestVersion returns the latest version for an actor.
// Returns 0 if no events exist for the actor.
GetLatestVersion(actorID string) (int64, error)
}
// EventStoreWithErrors extends EventStore with methods that report malformed events.
// Stores that may encounter corrupted data during replay (e.g., JetStream) should
// implement this interface to give callers visibility into data quality issues.
type EventStoreWithErrors interface {
EventStore
// GetEventsWithErrors retrieves events for an actor and reports any malformed
// events encountered. This method allows callers to decide how to handle
// corrupted data rather than silently skipping it.
GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error)
}
// SnapshotStore extends EventStore with snapshot capabilities
type SnapshotStore interface {
EventStore
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
SaveSnapshot(snapshot *ActorSnapshot) error
}