Organize all product strategy and domain modeling documentation into a dedicated .product-strategy directory for better separation from code. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
36 KiB
Domain Model: Event Sourcing
Summary
The Event Sourcing bounded context is Aether's core responsibility: persist events as an immutable, append-only source of truth and enable state reconstruction through replay. This context enforces two critical invariants:
- Monotonic Versioning: Each actor's event stream must have strictly increasing version numbers to prevent concurrent writes and enable optimistic concurrency control.
- Append-Only Persistence: Events, once written, are immutable and never deleted or modified—they form an authoritative audit trail.
This context does not model domain-specific business logic (that is the responsibility of downstream bounded contexts); instead, it provides composable primitives for any domain to build event-sourced systems. State is always derived by replaying events, never stored directly.
Problem Space
User Journeys
Journey 1: Store an event
- Actor initiates change (command)
- System derives new event
- Writer attempts to persist with next version
- If version conflict → writer lost race, must reload and retry
- If success → event immutable forever
Journey 2: Rebuild state from history
- Read latest snapshot (if exists)
- Replay all events since snapshot version
- Apply each event to accumulate state
- Handle corrupted events gracefully (log, skip, alert)
Journey 3: Scale across nodes
- Single-node InMemory store for testing
- Multi-node JetStream store for HA and durability
- Namespace isolation for logical boundaries
Decision Points (Risks)
Decision 1: Which EventStore implementation?
- InMemory: Fast, for testing, no persistence
- JetStream: Durable, clusterable, NATS-native, production-ready
- Risk: Choosing wrong store cripples downstream systems
Decision 2: How to handle version conflicts?
- Silent retry: Hide complexity, but magic is hard to debug
- Explicit error: Force caller to decide (plausible, idempotent, or fail)
- Risk: Wrong policy loses data or causes infinite loops
Decision 3: Corrupt event handling?
- Silently skip: Data loss is invisible
- Report with context: Caller sees and can handle
- Risk: Silent data loss vs operational noise
Decision 4: Snapshot strategy?
- When to snapshot? How often?
- When to use snapshot? Only if recent enough?
- Risk: Stale snapshots give wrong state; frequent snapshots waste storage
Key Invariants from Problem Space
- Monotonic Versions: Version must be > previous version for same actor
- Append-Only: Events never deleted, modified, or reordered
- Idempotent IDs: Event ID + Actor scope must prevent duplicates (deduplication key)
- Snapshot Validity: Snapshot at version V is only valid until V+MaxEventsWithoutSnapshot new events
Aggregates
An aggregate enforces invariants through transactional boundaries. The Event Sourcing context has one core aggregate that enforces monotonic versioning.
Aggregate: ActorEventStream (Root)
Invariant Enforced:
- Version of new event must be > version of last event for this actor
- Events are immutable (enforced at storage layer)
Root Entity: ActorEventStream
- Uniquely identified by:
(ActorID, Namespace) - Holds: Current version (derived from events)
- Responsibility: Validate incoming event version against current version
Entities:
- None (this is single-entity aggregate)
Value Objects:
Event: Immutable fact (ID, EventType, ActorID, Version, Data, Metadata, Timestamp)ActorSnapshot: Point-in-time state (ActorID, Version, State, Timestamp)Version: Integer >= 0 representing order in stream
Lifecycle:
Created when:
- First event is saved for an actor (implicitly created by EventStore)
- No explicit "create" command—aggregates emerge from first write
Destroyed when:
- Events explicitly purged (out of scope for current design)
- ActorID becomes irrelevant (app-level decision, not library decision)
State Transitions:
Empty→AtVersion(v1)when first event (version v1) is savedAtVersion(vn)→AtVersion(vn+1)when new event (version vn+1) is saved- Transition fails if new version <= vn (VersionConflict)
Storage:
- InMemoryEventStore: In-memory map
actorID → []*Event, locked with mutex - JetStreamEventStore: NATS JetStream stream with subject
{namespace}.events.{actorType}.{actorID}
Why Single-Entity Aggregate?
Events are immutable once persisted. There is no mutable child entity. The aggregate's only responsibility is:
- Track current version for an actor
- Validate new version > current version
- Reject if conflict (return error, let caller retry)
This is minimal by design—domain logic lives elsewhere.
Commands
Commands represent intent to change state. They may succeed or fail. In this context, only one command exists on the aggregate:
Command: SaveEvent
Aggregate: ActorEventStream
Intent: Persist a domain event to the event store
Input:
event *Eventcontaining:ID: Unique identifier for event (idempotence key)EventType: Domain language (e.g., "OrderPlaced", not "Create")ActorID: Identity of aggregate this event belongs toVersion: Monotonically increasing number for this actorData: Domain-specific payload (map[string]interface{})Metadata: Optional tracing (CorrelationID, CausationID, UserID, TraceID, SpanID)Timestamp: When event occurredCommandID: ID of command that triggered this event (optional, for tracing)
Preconditions (Validation):
- Event is not nil
- Event.ID is not empty (deduplication)
- Event.ActorID is not empty
- Event.Version > 0 (versions start at 1, not 0)
- Event.Version > CurrentVersion (checked by EventStore)
- Event data is valid JSON-serializable
Postcondition (Success):
- Event is persisted to store
- No return value (just nil error)
- Event is immutable from this point forward
- Caller must assume idempotent (same event ID = idempotent)
Failure Modes:
- VersionConflictError:
event.Version <= currentVersion- Another writer won the race
- Caller must: reload latest version, recompute event with new version, retry
- Or: decide event is stale (no longer needed) and skip
- Serialization Error: Event.Data not JSON-serializable
- Caller must fix data structure before retry
- Store Error: Underlying storage failure (IO, network, etc.)
- Caller should: retry with backoff, or circuit-break
Why Fail on Version Conflict (vs Auto-Retry)?
Auto-retry is dangerous:
- Caller doesn't know if version conflict is due to legitimate concurrent write or duplicate write
- Legitimate: Another command legitimately moved actor forward → retry with new state
- Duplicate: Same command retried → event already in stream, skip is safer than duplicate
- Library can't decide → caller must decide
Why Version Passed by Caller (Not Auto-Incremented)?
Caller knows:
- Whether event is idempotent (same ID = same command, safe to skip if already saved)
- What the expected previous version should be (optimistic concurrency control)
- Whether to retry or abandon
Auto-increment would hide this logic:
- Caller couldn't detect lost writes
- No optimistic concurrency control
- Caller would have to call GetLatestVersion in separate operation (race condition)
Command: GetLatestVersion
Aggregate: ActorEventStream
Intent: Read current version for an actor (to prepare next SaveEvent call)
Input:
actorID string
Output:
version int64: Highest version seen for this actorerror: Storage error (IO, network)
Behavior:
- Returns 0 if no events exist for actor (first event should use version 1)
- Scans all events for actor to find max version
- Cached in JetStreamEventStore (invalidated on SaveEvent of same actor)
Command: GetEvents
Aggregate: ActorEventStream
Intent: Replay events to rebuild state
Input:
actorID stringfromVersion int64: Starting version (inclusive)
Output:
[]*Event: Events in version ordererror: Storage error
Behavior:
- If fromVersion=0: returns all events
- If fromVersion=N: returns events where version >= N
- Silently skips malformed events (see GetEventsWithErrors for visibility)
Command: GetEventsWithErrors
Aggregate: ActorEventStream
Intent: Replay events with visibility into data quality issues
Input:
actorID stringfromVersion int64
Output:
*ReplayResultcontaining:Events []*Event: Successfully unmarshaled eventsErrors []ReplayError: Malformed events encountered
error: Storage error
Behavior:
- Like GetEvents but reports corrupted events instead of silently skipping
- Caller can decide: skip, log, alert, fail-fast, etc.
- ReplayError includes: SequenceNumber, RawData, UnmarshalErr
Why Separate Interface?
JetStream may encounter corrupted data (schema migration, bug, corruption). Caller needs visibility:
- Audit requirement: Know if data quality degrades
- Recovery: May be able to manually repair
- Alert: Detect silent data loss
- InMemory store: Never has corrupted data (only live objects); still implements interface for consistency
Events
Events are facts that happened. They are immutable, named in past tense, and only published after successful command execution.
Event: EventStored
Triggered by: SaveEvent command (success)
Aggregate: ActorEventStream
Data Captured:
ID: Event identifier (for deduplication)ActorID: Which actor this event is aboutVersion: Version number in this actor's streamEventType: Type of domain event (e.g., "OrderPlaced")Data: Domain payloadMetadata: Tracing info (CorrelationID, CausationID, UserID, etc.)Timestamp: When event occurred
Published To:
- EventBus (local): Subscribers in same process
- NATSEventBus (distributed): Subscribers across cluster (via JetStream)
- Namespace isolation: Event only visible to subscribers of that namespace
Consumers:
- Event handlers in downstream contexts (e.g., Inventory, Fulfillment)
- Saga coordinators
- Projections (read models)
- Audit logs
Event: VersionConflict
Triggered by: SaveEvent command (failure due to concurrent write)
Aggregate: ActorEventStream
Data Captured:
ActorID: Which actor experienced conflictAttemptedVersion: Version caller tried to saveCurrentVersion: Version that won the race
Note: This is not published as a domain event. Instead:
- SaveEvent returns VersionConflictError
- Caller sees error and must decide what to do
- Caller may retry with new version, or give up
This is not an event in the domain event sense (which are facts), but a response to a failed command.
Policies
Policies are automated reactions to events. They enforce business rules without explicit command from caller.
Policy: Version Validation on SaveEvent
Trigger: When SaveEvent is called
Action: Validate event.Version > currentVersion
Implementation:
- EventStore.SaveEvent checks:
event.Version <= currentVersion? - If true: return VersionConflictError, abort save
- If false: persist event
Why: Prevent event stream corruption from concurrent writes with same version
Policy: Append-Only Persistence
Trigger: When SaveEvent succeeds
Action: Event is never deleted, modified, or reordered
Implementation:
- InMemoryEventStore: Append to slice, never remove
- JetStreamEventStore: Publish to JetStream (immutable by design)
Enforcement:
- Library code has no Delete/Update/Reorder operations
- Events are value objects (if someone cloned the object, modifications don't affect stored copy)
Policy: Idempotent Event Publishing
Trigger: When SaveEvent persists event with ID X
Action: If same event ID is received again, reject as duplicate
Implementation:
- JetStreamEventStore: Publish with
nats.MsgId(event.ID)→ JetStream deduplicates - InMemoryEventStore: No deduplication (testing only)
Why: Retry safety. If network fails after publish but before response, caller may retry. Dedup ensures same event isn't persisted twice.
Policy: Snapshot Invalidation
Trigger: When new event is saved for an actor
Action: Snapshot is only valid until MaxEventsWithoutSnapshot new events are added
Implementation: (Not yet implemented in library—future feature)
- SaveEvent increments event count since last snapshot
- GetEvents + snapshot logic: If snapshot is recent → start replay from snapshot; if stale → replay all events
Why: Optimize replay performance without inconsistency
Read Models
Read models are queries with no invariants. They are projections derived from events. EventStore itself provides three read models:
Read Model: EventStream
Purpose: Answer "What is the complete history of events for actor X?"
Data:
ActorIDEvents: List of events in version order
Source: GetEvents query
- Reads from EventStore
- Built by replaying events
- Consistent with source of truth
Query Pattern:
events, _ := store.GetEvents(actorID, 0) // All events
Read Model: CurrentVersion
Purpose: Answer "What is the latest version for actor X?"
Data:
ActorIDVersion: Highest version number
Source: GetLatestVersion query
- Scans event stream for max version
- Cached in JetStreamEventStore for performance
Query Pattern:
version, _ := store.GetLatestVersion(actorID)
nextVersion := version + 1
Read Model: StateSnapshot
Purpose: Answer "What was the state of actor X at version V?"
Data:
ActorIDVersion: Snapshot versionState: Accumulated state map[string]interface{}Timestamp: When snapshot was taken
Source: GetLatestSnapshot query
- Reads from SnapshotStore
- Built once by domain logic, then stored
- Enables fast replay (replay only events after snapshot version)
Query Pattern:
snapshot, _ := store.GetLatestSnapshot(actorID)
// Replay from snapshot.Version + 1 onward
if snapshot != nil {
events, _ := store.GetEvents(actorID, snapshot.Version + 1)
} else {
events, _ := store.GetEvents(actorID, 1)
}
Read Model: Namespace-Scoped Events (EventBus)
Purpose: Answer "What events happened in namespace X?"
Data:
Namespace: Logical boundary (tenant, domain, environment)Events: Stream of events in this namespace
Source: EventBroadcaster.Subscribe query
- Events published by SaveEvent are distributed to subscribers
- Namespace-scoped: Event only visible to subscribers of that namespace
- Filtered: Can subscribe to specific event types or actor patterns
Query Pattern:
ch := eventBus.Subscribe("tenant-abc")
for event := range ch {
// React to event
}
Value Objects
Value objects are immutable, attribute-defined concepts (not identity-defined). They have no lifecycle, only behavior.
Value Object: Event
Definition: Immutable fact that happened in the system
Attributes:
ID string: Unique identifier (for deduplication within context)EventType string: Domain language name (e.g., "OrderPlaced")ActorID string: What aggregate this event concernsCommandID string: ID of command that caused this (optional, for tracing)Version int64: Order in this actor's event streamData map[string]interface{}: Domain-specific payloadMetadata map[string]string: Tracing context (CorrelationID, CausationID, UserID, TraceID, SpanID)Timestamp time.Time: When event occurred
Immutability:
- All fields are public (Go convention)
- Library treats as immutable: never modifies a persisted event
- Callers should treat as immutable (no library-level enforcement)
Behavior:
- SetMetadata / GetMetadata: Helper methods for metadata management
- SetCorrelationID, GetCorrelationID, etc.: Tracing helpers
- WithMetadataFrom: Copy metadata from another event (for chaining)
Why Value Object?
- Events are facts, equality is based on content not identity
- Two events with same ID, type, actor, version, data are equivalent
- Can be serialized/deserialized without losing information
Value Object: ActorSnapshot
Definition: Point-in-time state snapshot to avoid replaying entire history
Attributes:
ActorID string: Which actor this snapshot is forVersion int64: Snapshot was taken at this versionState map[string]interface{}: Accumulated stateTimestamp time.Time: When snapshot was taken
Immutability:
- Once stored, snapshot is never modified
- Creating new snapshot doesn't invalidate old one (keep both)
Why Value Object?
- Snapshot is just captured state, equality is content-based
- No lifecycle (not "create then update")
- Can be discarded and regenerated
Value Object: Version
Definition: Order in an event stream
Attributes:
int64: Non-negative integer
Semantics:
- Version must be > 0 for valid events
- Version 0 means "no events yet"
- Versions for same actor must be strictly increasing
Behavior:
- Implicit: Version comparison (is v2 > v1?)
Why Value Object?
- Version is just a number, no identity
- Equality: two versions with same number are equal
- Can appear in multiple aggregates (same actor, different contexts)
Value Object: VersionConflictError
Definition: Detailed error about version conflict
Attributes:
ActorID string: Which actor had conflictAttemptedVersion int64: Version caller tried to saveCurrentVersion int64: Version already in store
Behavior:
- Implements error interface
- Unwrap() returns sentinel ErrVersionConflict
- Caller can use
errors.Is(err, aether.ErrVersionConflict)to detect
Why Value Object?
- Error is immutable, describes a fact
- Contains enough context for caller to decide next action
Value Object: ReplayError
Definition: Details about malformed event during replay
Attributes:
SequenceNumber uint64: Position in stream (if available)RawData []byte: Unparseable bytesErr error: Underlying unmarshal error
Behavior:
- Implements error interface
- Caller can inspect to decide: skip, log, alert, or fail
Namespace Isolation (Value Object)
Definition: Logical boundary for events (not multi-tenancy, just organization)
Scope:
- EventStore: Namespace prefixes stream name
- EventBus: Namespace patterns filter subscriptions
- Not enforced by library: caller must ensure app-level isolation
Semantics:
- InMemoryEventStore: Single namespace (no prefix)
- JetStreamEventStore: Optional namespace prefix on stream name
- EventBus: Exact namespace patterns + wildcard patterns
Example:
// Two stores, completely isolated
store1, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-a")
store2, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-b")
// store1 events invisible to store2
// Events in store1 go to stream "tenant-a_events"
// Events in store2 go to stream "tenant-b_events"
Code Analysis
Current Implementation vs Intended Model
Aggregate: ActorEventStream
Intended: Single-entity aggregate protecting monotonic version invariant
Current:
-
EventStore interface (event.go): Contract for persistence
- SaveEvent validates version > current (enforces invariant)
- GetEvents returns events (read model)
- GetLatestVersion returns max version (read model)
- ✓ Aligns: Invariant is in SaveEvent signature/contract
-
InMemoryEventStore (store/memory.go): Concrete implementation
- mu sync.RWMutex: Protects event list
- events map[string][]*Event: Stores events per actor
- SaveEvent: Validates version before appending
- ✓ Aligns: Invariant enforced in SaveEvent
-
JetStreamEventStore (store/jetstream.go): Production implementation
- mu sync.Mutex: Protects version checks
- versions map[string]int64: Caches latest version
- SaveEvent: Validates version in mutex critical section
- ✓ Aligns: Invariant enforced in SaveEvent with concurrency protection
Design Decision: Why isn't ActorEventStream a concrete class?
Because Aether is a library of primitives, not a framework. The EventStore interface is the aggregate boundary:
- EventStore.SaveEvent = validate and persist
- Caller is responsible for: creating event, setting version, deciding action on conflict
- This is not an omission—it's the design: caller controls retry logic, library enforces invariant
Commands
Intended: SaveEvent, GetLatestVersion, GetEvents, GetEventsWithErrors
Current:
- SaveEvent → ✓ Aligns (explicit command)
- GetLatestVersion → ✓ Aligns (read model query)
- GetEvents → ✓ Aligns (read model query)
- GetEventsWithErrors → ✓ Aligns (read model query with error visibility)
Events
Intended: EventStored (published), VersionConflict (error response)
Current:
-
Event struct (event.go): ✓ Aligns
- Data, Metadata, Timestamp captured
- SetMetadata helpers for tracing
- ID for deduplication
- Version for ordering
-
EventBus (eventbus.go): ✓ Aligns
- Publish(namespace, event) → distributes EventStored
- Namespace-scoped subscriptions
- Wildcard patterns for cross-namespace
- ✓ Correct: EventStored is implicitly published by SaveEvent caller
-
VersionConflictError (event.go): ✓ Aligns
- Returned from SaveEvent on conflict
- Contains context: ActorID, AttemptedVersion, CurrentVersion
- Caller sees it and decides next action
Policies
Intended:
- Version Validation → ✓ Aligns (SaveEvent enforces)
- Append-Only → ✓ Aligns (no delete/update in interface)
- Idempotent Publishing → ✓ Aligns (JetStream dedup by message ID)
- Snapshot Invalidation → ✗ Missing (not yet implemented)
Read Models
Intended:
- EventStream (GetEvents) → ✓ Aligns
- CurrentVersion (GetLatestVersion) → ✓ Aligns
- StateSnapshot (GetLatestSnapshot) → ✓ Aligns
- Namespace-Scoped Events (EventBus.Subscribe) → ✓ Aligns
Snapshots
Intended: Value object, separate from events
Current:
- ActorSnapshot struct (event.go): ✓ Aligns
- SnapshotStore interface (event.go): ✓ Aligns
- SaveSnapshot, GetLatestSnapshot
- Separate from EventStore (composition via interface)
Namespace Isolation
Intended: Logical boundary, EventStore and EventBus both support
Current:
- InMemoryEventStore: No namespace support (testing only)
- JetStreamEventStore: ✓ Aligns
- NewJetStreamEventStoreWithNamespace: Convenience function
- JetStreamConfig.Namespace: Prefix on stream name
- Sanitization of subject characters
- Complete isolation at storage layer
- EventBus: ✓ Aligns
- Subscribe(namespacePattern): Exact + wildcard patterns
- Publish(namespaceID, event): Delivers to pattern-matching subscribers
- MatchNamespacePattern: NATS-style subject matching
Design Decisions & Rationale
Decision 1: Version Passed by Caller, Not Auto-Incremented
Intended Design:
// Caller responsible for versioning
currentVersion, _ := store.GetLatestVersion(actorID)
event.Version = currentVersion + 1
err := store.SaveEvent(event)
Alternative (Not Taken):
// Library increments version
event.Version = 0 // Library fills in
err := store.SaveEvent(event)
Why Caller-Controlled?
-
Optimistic Concurrency Control: Caller has saved the version it expected. If another writer moved forward, caller can detect and decide:
- Idempotent command? Skip, already done
- Concurrent command? Merge their changes and retry
- Conflicting? Backoff and alert
-
Clarity: Version is not magic. Caller knows exactly what version they're writing.
-
Idempotence: Caller can use same event ID + version pair to detect and skip duplicates.
-
Flexibility: Different domains have different conflict resolution strategies. Library doesn't impose one.
Cost: Caller must manage version explicitly. But this is intentional: primitives over frameworks.
Decision 2: Fail on Conflict, Don't Retry
Intended Design:
err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
// Caller decides: retry, skip, or backoff
}
Alternative (Not Taken):
err := store.SaveEvent(event) // Always succeeds (auto-retries internally)
Why Fail Explicitly?
-
Observability: Caller sees conflict and can log/metric it.
-
Idempotence Safety: Auto-retry + auto-increment could turn conflict into duplicate write:
- Attempt 1: version 2, fails (another writer got version 2)
- Auto-retry: version 3, succeeds (but original command lost)
- Caller doesn't know original event wasn't saved
-
Control: Caller decides retry strategy:
- Backoff + retry: For transient contention
- Skip: For duplicate retries of same command
- Alert: For unexpected behavior
- Fail-fast: For critical paths
Cost: Caller must handle retries. But conflict should be rare (different aggregates, different writers).
Decision 3: Snapshots Separate from EventStore
Intended Design:
// Snapshot is optional optimization, not required
type SnapshotStore interface {
EventStore
SaveSnapshot(snapshot *ActorSnapshot) error
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
}
Alternative (Not Taken):
// Snapshot baked into EventStore interface
type EventStore interface {
SaveEvent(event *Event) error
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
SaveSnapshot(snapshot *ActorSnapshot) error
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
}
Why Separate Interface?
-
Optional: Many domains don't need snapshots (small event streams).
-
Composition: Caller can wrap only if needed:
var store aether.EventStore = inmem.NewInMemoryEventStore() // No snapshots var snapshotStore aether.SnapshotStore = jsMem.NewJetStreamEventStore(...) // With snapshots -
Clarity: Snapshot logic (when to snapshot, when to use) is domain concern, not library concern.
Cost: Caller must check interface type to access snapshots. But this is fine—snapshots are optimization, not core.
Decision 4: Metadata Tracing Fields on Event
Intended Design:
event.SetCorrelationID(correlationID)
event.SetCausationID(causationID)
event.SetUserID(userID)
event.SetTraceID(traceID)
Why Include?
-
Auditability: Know who caused what, in what order (causation chains).
-
Observability: Trace IDs link to distributed tracing (OpenTelemetry).
-
No Enforcement: Metadata is optional. Caller sets if needed.
-
Standard Names: Metadata keys (MetadataKeyCorrelationID, etc.) are constants, not magic strings.
Cost: Event struct has extra field. But it's map[string]string, not dozens of fields.
Decision 5: Namespace Isolation via Stream Prefixing (JetStream)
Intended Design:
store, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-a")
// Creates stream: "tenant-a_events"
store, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-b")
// Creates stream: "tenant-b_events"
// Complete isolation: events in tenant-a invisible to tenant-b
Alternative (Not Taken):
// Single stream, subject-based filtering
// Stream "events" with subjects: "events.tenant-a.>", "events.tenant-b.>"
// But then sharing consumers between tenants is risky
Why Stream-Level Prefix?
-
Strong Isolation: Each namespace has its own stream. No cross-contamination.
-
Storage Guarantee: Compliance/legal: tenant data completely separated at storage layer.
-
Independent Scaling: Each tenant's stream can have different retention/replicas.
-
Simplicity: No subject-level filtering logic needed.
Cost: Multiple streams for multiple namespaces. But JetStream handles this efficiently.
Decision 6: Replay Errors Visible (EventStoreWithErrors Interface)
Intended Design:
type EventStoreWithErrors interface {
EventStore
GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error)
}
// Returns both valid events and errors
result, _ := store.GetEventsWithErrors(actorID, 0)
for _, err := range result.Errors {
log.Printf("Corrupted event at sequence %d: %v", err.SequenceNumber, err.Err)
}
Alternative (Not Taken):
// Silently skip malformed events
events, _ := store.GetEvents(actorID, 0)
// Corruption invisible
Why Expose Errors?
-
Data Quality Visibility: Silent data loss is the worst. Caller must see it.
-
Recovery: Operator can manually inspect and repair corrupted data.
-
Alerting: Corrupted events trigger alerts (e.g., Prometheus metric).
-
Compliance: Audit trail shows if/when data quality degraded.
Cost: Caller must handle errors. But better than silent data loss.
Backward Compatibility: GetEvents still silently skips (for callers that don't care). GetEventsWithErrors is opt-in.
Alignment with Vision
"Primitives Over Frameworks"
This context provides primitives:
- Event: Immutable fact
- EventStore: Interface for persistence (two implementations)
- Version: Monotonic order
- Namespace: Logical boundary
Not a framework:
- No event schema enforcement (Data is map[string]interface{})
- No command handlers (Caller handles SaveEvent)
- No projections (Caller builds read models)
- No sagas (Caller coordinates multi-aggregate transactions)
Caller composes these into their domain model.
"NATS-Native"
Event Sourcing uses:
- JetStreamEventStore: Built on JetStream, not bolted on
- NATSEventBus: Distributes events via JetStream subjects
- Namespace isolation: Uses stream naming convention, not generic filtering
"Resource Conscious"
- InMemoryEventStore: Minimal overhead (map + mutex)
- JetStreamEventStore: Leverages NATS JetStream efficiency
- No unnecessary serialization: Events stored as JSON (standard, efficient)
- Caching: Version cache in JetStreamEventStore reduces repeated lookups
"Events as Complete History"
- Append-only: Events never deleted
- Immutable: Events never modified
- Durable: JetStream persists to disk
- Replayable: Full event history available for state reconstruction
Gaps & Improvements
Gap 1: Snapshot Invalidation Policy (Not Implemented)
Current: Snapshots are never invalidated. If snapshot is stale (many events since snapshot), replay is slow.
Intended: Snapshot valid only until MaxEventsWithoutSnapshot new events are added.
Improvement:
- Add MaxVersionDelta to GetLatestSnapshot logic
- Snapshot at (latestVersion - snapshot.Version) < MaxVersionDelta → use snapshot
- Snapshot at (latestVersion - snapshot.Version) >= MaxVersionDelta → replay all events
Effort: Low (logic in GetEvents + snapshot)
Gap 2: Bulk Operations (Not Implemented)
Current: SaveEvent is one event at a time. No transaction for multiple events.
Intended: SaveMultipleEvents for atomic saves of related events.
Improvement:
- Add SaveMultipleEvents(actorID, events) to EventStore interface
- Validates all versions are consecutive + greater than current
- Saves all or none (atomic)
- Useful for: snapshot + first event after, or replayed events
Effort: Medium (concurrency careful in JetStream)
Gap 3: Event Schema Evolution (Not Addressed)
Current: Event.Data is map[string]interface{}. No type safety, no schema validation.
Intended: Caller responsible for versioning (e.g., v1/v2 fields in Data map).
Improvement: (Probably out of scope for Event Sourcing context)
- Add EventType versioning (e.g., "OrderPlacedV1", "OrderPlacedV2")
- Or: Add field in Data (e.g., Data["_schema"] = "v2")
- Document best practices for schema evolution
Effort: High (requires design + examples)
Gap 4: Event Deduplication on Replay (Not Implemented)
Current: Caller must avoid sending same event ID twice (dedup is up to them).
Intended: EventStore rejects duplicate event IDs for same actor.
Improvement:
- Track event IDs per actor
- SaveEvent: Check if event ID already exists for actor
- If yes: Return VersionConflictError or DuplicateEventError
- InMemoryEventStore: Add idMap; JetStreamEventStore already has dedup via message ID
Effort: Low (simple map check)
Gap 5: Time Travel Queries (Not Implemented)
Current: Can only replay forward (from version N onward).
Intended: Query state at specific point in time or version.
Improvement: (Probably out of scope)
- Add GetEventsUntilVersion(actorID, version) to get events up to version N
- Caller can implement "state at timestamp T" by filtering
Effort: Low (small addition to EventStore interface)
Gap 6: Distributed Tracing Integration (Partial)
Current: Metadata fields for TraceID/SpanID, but no automatic integration.
Intended: Automatic trace context propagation.
Improvement:
- Add helper to extract trace context from context.Context
- SaveEvent extracts TraceID/SpanID from context and sets on event
- Or: Metrics publisher emits spans for event persistence
Effort: Medium (OpenTelemetry integration)
Testing Considerations
Unit Tests
EventStore contract tests (to verify implementations):
- SaveEvent rejects version <= current
- SaveEvent accepts version > current
- GetLatestVersion returns max of all events
- GetEvents filters by fromVersion
- GetEventsWithErrors separates good events from corrupted ones
- Idempotent event ID → no duplicate (if implemented)
Value object tests:
- Event immutability (no public setters)
- Version comparison
- Metadata helpers (SetMetadata, GetMetadata, etc.)
Integration Tests
EventBus + EventStore + Namespace isolation:
- Publish to namespace A, verify only namespace A subscribers receive
- Wildcard subscription receives from multiple namespaces
- Event filtering (EventTypes, ActorPattern) works correctly
JetStream specifics:
- Multiple instances of JetStreamEventStore with different namespaces don't interfere
- Version cache invalidates correctly on SaveEvent
- Corrupted events in JetStream trigger ReplayError
Brownfield Migration
If integrating Event Sourcing into existing system:
- Start with InMemoryEventStore in tests
- Add JetStreamEventStore for integration tests (with NATS running)
- Gradually migrate command handlers to use SaveEvent
- Add EventBus subscribers for downstream contexts
- Monitor VersionConflict errors to understand contention
References
Key Files
/Users/hugo.nijhuis/src/github/flowmade-one/aether/event.go: Event, EventStore, VersionConflictError, ActorSnapshot, SnapshotStore, EventStoreWithErrors/Users/hugo.nijhuis/src/github/flowmade-one/aether/store/memory.go: InMemoryEventStore/Users/hugo.nijhuis/src/github/flowmade-one/aether/store/jetstream.go: JetStreamEventStore with namespace support/Users/hugo.nijhuis/src/github/flowmade-one/aether/eventbus.go: EventBus, EventBroadcaster, SubscriptionFilter/Users/hugo.nijhuis/src/github/flowmade-one/aether/pattern.go: Namespace pattern matching, actor pattern matching
Related Contexts
- Clustering: Uses EventStore + EventBus for distributed coordination (leader election, shard assignment)
- Event Bus (NATS): Extends EventBus with cross-node distribution via NATS JetStream
- Actor Model (downstream): Uses EventStore to persist actor state changes and broadcast via EventBus
Summary: Invariants, Aggregates, Commands, Events
| Artifact | Name | Details |
|---|---|---|
| Invariant 1 | Monotonic Versions | Version > previous version for same actor |
| Invariant 2 | Append-Only | Events never deleted or modified |
| Aggregate | ActorEventStream | Root: actor + current version; enforces monotonic versions |
| Command 1 | SaveEvent | Persist event; fail if version conflict |
| Command 2 | GetLatestVersion | Read current version (for optimistic concurrency) |
| Command 3 | GetEvents | Replay events from version N |
| Command 4 | GetEventsWithErrors | Replay with visibility into corrupted events |
| Event 1 | EventStored (implicit) | Published to EventBus after SaveEvent succeeds |
| Event 2 | VersionConflict (error response) | Returned from SaveEvent on conflict |
| Policy 1 | Version Validation | SaveEvent enforces version > current |
| Policy 2 | Append-Only | No delete/update in EventStore interface |
| Policy 3 | Idempotent Publishing | JetStream dedup by event ID |
| Read Model 1 | EventStream | GetEvents returns list of events |
| Read Model 2 | CurrentVersion | GetLatestVersion returns max version |
| Read Model 3 | StateSnapshot | GetLatestSnapshot returns latest snapshot |
| Read Model 4 | Namespace-Scoped Events | EventBus.Subscribe delivers matching events |