Files
aether/.product-strategy/DOMAIN_MODEL_EVENT_SOURCING.md
Hugo Nijhuis 271f5db444
Some checks failed
CI / build (push) Successful in 21s
CI / integration (push) Failing after 2m1s
Move product strategy documentation to .product-strategy directory
Organize all product strategy and domain modeling documentation into a
dedicated .product-strategy directory for better separation from code.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-12 23:57:20 +01:00

36 KiB

Domain Model: Event Sourcing

Summary

The Event Sourcing bounded context is Aether's core responsibility: persist events as an immutable, append-only source of truth and enable state reconstruction through replay. This context enforces two critical invariants:

  1. Monotonic Versioning: Each actor's event stream must have strictly increasing version numbers to prevent concurrent writes and enable optimistic concurrency control.
  2. Append-Only Persistence: Events, once written, are immutable and never deleted or modified—they form an authoritative audit trail.

This context does not model domain-specific business logic (that is the responsibility of downstream bounded contexts); instead, it provides composable primitives for any domain to build event-sourced systems. State is always derived by replaying events, never stored directly.


Problem Space

User Journeys

Journey 1: Store an event

  • Actor initiates change (command)
  • System derives new event
  • Writer attempts to persist with next version
  • If version conflict → writer lost race, must reload and retry
  • If success → event immutable forever

Journey 2: Rebuild state from history

  • Read latest snapshot (if exists)
  • Replay all events since snapshot version
  • Apply each event to accumulate state
  • Handle corrupted events gracefully (log, skip, alert)

Journey 3: Scale across nodes

  • Single-node InMemory store for testing
  • Multi-node JetStream store for HA and durability
  • Namespace isolation for logical boundaries

Decision Points (Risks)

Decision 1: Which EventStore implementation?

  • InMemory: Fast, for testing, no persistence
  • JetStream: Durable, clusterable, NATS-native, production-ready
  • Risk: Choosing wrong store cripples downstream systems

Decision 2: How to handle version conflicts?

  • Silent retry: Hide complexity, but magic is hard to debug
  • Explicit error: Force caller to decide (plausible, idempotent, or fail)
  • Risk: Wrong policy loses data or causes infinite loops

Decision 3: Corrupt event handling?

  • Silently skip: Data loss is invisible
  • Report with context: Caller sees and can handle
  • Risk: Silent data loss vs operational noise

Decision 4: Snapshot strategy?

  • When to snapshot? How often?
  • When to use snapshot? Only if recent enough?
  • Risk: Stale snapshots give wrong state; frequent snapshots waste storage

Key Invariants from Problem Space

  1. Monotonic Versions: Version must be > previous version for same actor
  2. Append-Only: Events never deleted, modified, or reordered
  3. Idempotent IDs: Event ID + Actor scope must prevent duplicates (deduplication key)
  4. Snapshot Validity: Snapshot at version V is only valid until V+MaxEventsWithoutSnapshot new events

Aggregates

An aggregate enforces invariants through transactional boundaries. The Event Sourcing context has one core aggregate that enforces monotonic versioning.

Aggregate: ActorEventStream (Root)

Invariant Enforced:

  • Version of new event must be > version of last event for this actor
  • Events are immutable (enforced at storage layer)

Root Entity: ActorEventStream

  • Uniquely identified by: (ActorID, Namespace)
  • Holds: Current version (derived from events)
  • Responsibility: Validate incoming event version against current version

Entities:

  • None (this is single-entity aggregate)

Value Objects:

  • Event: Immutable fact (ID, EventType, ActorID, Version, Data, Metadata, Timestamp)
  • ActorSnapshot: Point-in-time state (ActorID, Version, State, Timestamp)
  • Version: Integer >= 0 representing order in stream

Lifecycle:

Created when:

  • First event is saved for an actor (implicitly created by EventStore)
  • No explicit "create" command—aggregates emerge from first write

Destroyed when:

  • Events explicitly purged (out of scope for current design)
  • ActorID becomes irrelevant (app-level decision, not library decision)

State Transitions:

  • EmptyAtVersion(v1) when first event (version v1) is saved
  • AtVersion(vn)AtVersion(vn+1) when new event (version vn+1) is saved
  • Transition fails if new version <= vn (VersionConflict)

Storage:

  • InMemoryEventStore: In-memory map actorID → []*Event, locked with mutex
  • JetStreamEventStore: NATS JetStream stream with subject {namespace}.events.{actorType}.{actorID}

Why Single-Entity Aggregate?

Events are immutable once persisted. There is no mutable child entity. The aggregate's only responsibility is:

  1. Track current version for an actor
  2. Validate new version > current version
  3. Reject if conflict (return error, let caller retry)

This is minimal by design—domain logic lives elsewhere.


Commands

Commands represent intent to change state. They may succeed or fail. In this context, only one command exists on the aggregate:

Command: SaveEvent

Aggregate: ActorEventStream

Intent: Persist a domain event to the event store

Input:

  • event *Event containing:
    • ID: Unique identifier for event (idempotence key)
    • EventType: Domain language (e.g., "OrderPlaced", not "Create")
    • ActorID: Identity of aggregate this event belongs to
    • Version: Monotonically increasing number for this actor
    • Data: Domain-specific payload (map[string]interface{})
    • Metadata: Optional tracing (CorrelationID, CausationID, UserID, TraceID, SpanID)
    • Timestamp: When event occurred
    • CommandID: ID of command that triggered this event (optional, for tracing)

Preconditions (Validation):

  • Event is not nil
  • Event.ID is not empty (deduplication)
  • Event.ActorID is not empty
  • Event.Version > 0 (versions start at 1, not 0)
  • Event.Version > CurrentVersion (checked by EventStore)
  • Event data is valid JSON-serializable

Postcondition (Success):

  • Event is persisted to store
  • No return value (just nil error)
  • Event is immutable from this point forward
  • Caller must assume idempotent (same event ID = idempotent)

Failure Modes:

  • VersionConflictError: event.Version <= currentVersion
    • Another writer won the race
    • Caller must: reload latest version, recompute event with new version, retry
    • Or: decide event is stale (no longer needed) and skip
  • Serialization Error: Event.Data not JSON-serializable
    • Caller must fix data structure before retry
  • Store Error: Underlying storage failure (IO, network, etc.)
    • Caller should: retry with backoff, or circuit-break

Why Fail on Version Conflict (vs Auto-Retry)?

Auto-retry is dangerous:

  • Caller doesn't know if version conflict is due to legitimate concurrent write or duplicate write
  • Legitimate: Another command legitimately moved actor forward → retry with new state
  • Duplicate: Same command retried → event already in stream, skip is safer than duplicate
  • Library can't decide → caller must decide

Why Version Passed by Caller (Not Auto-Incremented)?

Caller knows:

  • Whether event is idempotent (same ID = same command, safe to skip if already saved)
  • What the expected previous version should be (optimistic concurrency control)
  • Whether to retry or abandon

Auto-increment would hide this logic:

  • Caller couldn't detect lost writes
  • No optimistic concurrency control
  • Caller would have to call GetLatestVersion in separate operation (race condition)

Command: GetLatestVersion

Aggregate: ActorEventStream

Intent: Read current version for an actor (to prepare next SaveEvent call)

Input:

  • actorID string

Output:

  • version int64: Highest version seen for this actor
  • error: Storage error (IO, network)

Behavior:

  • Returns 0 if no events exist for actor (first event should use version 1)
  • Scans all events for actor to find max version
  • Cached in JetStreamEventStore (invalidated on SaveEvent of same actor)

Command: GetEvents

Aggregate: ActorEventStream

Intent: Replay events to rebuild state

Input:

  • actorID string
  • fromVersion int64: Starting version (inclusive)

Output:

  • []*Event: Events in version order
  • error: Storage error

Behavior:

  • If fromVersion=0: returns all events
  • If fromVersion=N: returns events where version >= N
  • Silently skips malformed events (see GetEventsWithErrors for visibility)

Command: GetEventsWithErrors

Aggregate: ActorEventStream

Intent: Replay events with visibility into data quality issues

Input:

  • actorID string
  • fromVersion int64

Output:

  • *ReplayResult containing:
    • Events []*Event: Successfully unmarshaled events
    • Errors []ReplayError: Malformed events encountered
  • error: Storage error

Behavior:

  • Like GetEvents but reports corrupted events instead of silently skipping
  • Caller can decide: skip, log, alert, fail-fast, etc.
  • ReplayError includes: SequenceNumber, RawData, UnmarshalErr

Why Separate Interface?

JetStream may encounter corrupted data (schema migration, bug, corruption). Caller needs visibility:

  • Audit requirement: Know if data quality degrades
  • Recovery: May be able to manually repair
  • Alert: Detect silent data loss
  • InMemory store: Never has corrupted data (only live objects); still implements interface for consistency

Events

Events are facts that happened. They are immutable, named in past tense, and only published after successful command execution.

Event: EventStored

Triggered by: SaveEvent command (success)

Aggregate: ActorEventStream

Data Captured:

  • ID: Event identifier (for deduplication)
  • ActorID: Which actor this event is about
  • Version: Version number in this actor's stream
  • EventType: Type of domain event (e.g., "OrderPlaced")
  • Data: Domain payload
  • Metadata: Tracing info (CorrelationID, CausationID, UserID, etc.)
  • Timestamp: When event occurred

Published To:

  • EventBus (local): Subscribers in same process
  • NATSEventBus (distributed): Subscribers across cluster (via JetStream)
  • Namespace isolation: Event only visible to subscribers of that namespace

Consumers:

  • Event handlers in downstream contexts (e.g., Inventory, Fulfillment)
  • Saga coordinators
  • Projections (read models)
  • Audit logs

Event: VersionConflict

Triggered by: SaveEvent command (failure due to concurrent write)

Aggregate: ActorEventStream

Data Captured:

  • ActorID: Which actor experienced conflict
  • AttemptedVersion: Version caller tried to save
  • CurrentVersion: Version that won the race

Note: This is not published as a domain event. Instead:

  • SaveEvent returns VersionConflictError
  • Caller sees error and must decide what to do
  • Caller may retry with new version, or give up

This is not an event in the domain event sense (which are facts), but a response to a failed command.


Policies

Policies are automated reactions to events. They enforce business rules without explicit command from caller.

Policy: Version Validation on SaveEvent

Trigger: When SaveEvent is called

Action: Validate event.Version > currentVersion

Implementation:

  • EventStore.SaveEvent checks: event.Version <= currentVersion?
  • If true: return VersionConflictError, abort save
  • If false: persist event

Why: Prevent event stream corruption from concurrent writes with same version


Policy: Append-Only Persistence

Trigger: When SaveEvent succeeds

Action: Event is never deleted, modified, or reordered

Implementation:

  • InMemoryEventStore: Append to slice, never remove
  • JetStreamEventStore: Publish to JetStream (immutable by design)

Enforcement:

  • Library code has no Delete/Update/Reorder operations
  • Events are value objects (if someone cloned the object, modifications don't affect stored copy)

Policy: Idempotent Event Publishing

Trigger: When SaveEvent persists event with ID X

Action: If same event ID is received again, reject as duplicate

Implementation:

  • JetStreamEventStore: Publish with nats.MsgId(event.ID) → JetStream deduplicates
  • InMemoryEventStore: No deduplication (testing only)

Why: Retry safety. If network fails after publish but before response, caller may retry. Dedup ensures same event isn't persisted twice.


Policy: Snapshot Invalidation

Trigger: When new event is saved for an actor

Action: Snapshot is only valid until MaxEventsWithoutSnapshot new events are added

Implementation: (Not yet implemented in library—future feature)

  • SaveEvent increments event count since last snapshot
  • GetEvents + snapshot logic: If snapshot is recent → start replay from snapshot; if stale → replay all events

Why: Optimize replay performance without inconsistency


Read Models

Read models are queries with no invariants. They are projections derived from events. EventStore itself provides three read models:

Read Model: EventStream

Purpose: Answer "What is the complete history of events for actor X?"

Data:

  • ActorID
  • Events: List of events in version order

Source: GetEvents query

  • Reads from EventStore
  • Built by replaying events
  • Consistent with source of truth

Query Pattern:

events, _ := store.GetEvents(actorID, 0)  // All events

Read Model: CurrentVersion

Purpose: Answer "What is the latest version for actor X?"

Data:

  • ActorID
  • Version: Highest version number

Source: GetLatestVersion query

  • Scans event stream for max version
  • Cached in JetStreamEventStore for performance

Query Pattern:

version, _ := store.GetLatestVersion(actorID)
nextVersion := version + 1

Read Model: StateSnapshot

Purpose: Answer "What was the state of actor X at version V?"

Data:

  • ActorID
  • Version: Snapshot version
  • State: Accumulated state map[string]interface{}
  • Timestamp: When snapshot was taken

Source: GetLatestSnapshot query

  • Reads from SnapshotStore
  • Built once by domain logic, then stored
  • Enables fast replay (replay only events after snapshot version)

Query Pattern:

snapshot, _ := store.GetLatestSnapshot(actorID)
// Replay from snapshot.Version + 1 onward
if snapshot != nil {
    events, _ := store.GetEvents(actorID, snapshot.Version + 1)
} else {
    events, _ := store.GetEvents(actorID, 1)
}

Read Model: Namespace-Scoped Events (EventBus)

Purpose: Answer "What events happened in namespace X?"

Data:

  • Namespace: Logical boundary (tenant, domain, environment)
  • Events: Stream of events in this namespace

Source: EventBroadcaster.Subscribe query

  • Events published by SaveEvent are distributed to subscribers
  • Namespace-scoped: Event only visible to subscribers of that namespace
  • Filtered: Can subscribe to specific event types or actor patterns

Query Pattern:

ch := eventBus.Subscribe("tenant-abc")
for event := range ch {
    // React to event
}

Value Objects

Value objects are immutable, attribute-defined concepts (not identity-defined). They have no lifecycle, only behavior.

Value Object: Event

Definition: Immutable fact that happened in the system

Attributes:

  • ID string: Unique identifier (for deduplication within context)
  • EventType string: Domain language name (e.g., "OrderPlaced")
  • ActorID string: What aggregate this event concerns
  • CommandID string: ID of command that caused this (optional, for tracing)
  • Version int64: Order in this actor's event stream
  • Data map[string]interface{}: Domain-specific payload
  • Metadata map[string]string: Tracing context (CorrelationID, CausationID, UserID, TraceID, SpanID)
  • Timestamp time.Time: When event occurred

Immutability:

  • All fields are public (Go convention)
  • Library treats as immutable: never modifies a persisted event
  • Callers should treat as immutable (no library-level enforcement)

Behavior:

  • SetMetadata / GetMetadata: Helper methods for metadata management
  • SetCorrelationID, GetCorrelationID, etc.: Tracing helpers
  • WithMetadataFrom: Copy metadata from another event (for chaining)

Why Value Object?

  • Events are facts, equality is based on content not identity
  • Two events with same ID, type, actor, version, data are equivalent
  • Can be serialized/deserialized without losing information

Value Object: ActorSnapshot

Definition: Point-in-time state snapshot to avoid replaying entire history

Attributes:

  • ActorID string: Which actor this snapshot is for
  • Version int64: Snapshot was taken at this version
  • State map[string]interface{}: Accumulated state
  • Timestamp time.Time: When snapshot was taken

Immutability:

  • Once stored, snapshot is never modified
  • Creating new snapshot doesn't invalidate old one (keep both)

Why Value Object?

  • Snapshot is just captured state, equality is content-based
  • No lifecycle (not "create then update")
  • Can be discarded and regenerated

Value Object: Version

Definition: Order in an event stream

Attributes:

  • int64: Non-negative integer

Semantics:

  • Version must be > 0 for valid events
  • Version 0 means "no events yet"
  • Versions for same actor must be strictly increasing

Behavior:

  • Implicit: Version comparison (is v2 > v1?)

Why Value Object?

  • Version is just a number, no identity
  • Equality: two versions with same number are equal
  • Can appear in multiple aggregates (same actor, different contexts)

Value Object: VersionConflictError

Definition: Detailed error about version conflict

Attributes:

  • ActorID string: Which actor had conflict
  • AttemptedVersion int64: Version caller tried to save
  • CurrentVersion int64: Version already in store

Behavior:

  • Implements error interface
  • Unwrap() returns sentinel ErrVersionConflict
  • Caller can use errors.Is(err, aether.ErrVersionConflict) to detect

Why Value Object?

  • Error is immutable, describes a fact
  • Contains enough context for caller to decide next action

Value Object: ReplayError

Definition: Details about malformed event during replay

Attributes:

  • SequenceNumber uint64: Position in stream (if available)
  • RawData []byte: Unparseable bytes
  • Err error: Underlying unmarshal error

Behavior:

  • Implements error interface
  • Caller can inspect to decide: skip, log, alert, or fail

Namespace Isolation (Value Object)

Definition: Logical boundary for events (not multi-tenancy, just organization)

Scope:

  • EventStore: Namespace prefixes stream name
  • EventBus: Namespace patterns filter subscriptions
  • Not enforced by library: caller must ensure app-level isolation

Semantics:

  • InMemoryEventStore: Single namespace (no prefix)
  • JetStreamEventStore: Optional namespace prefix on stream name
  • EventBus: Exact namespace patterns + wildcard patterns

Example:

// Two stores, completely isolated
store1, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-a")
store2, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-b")

// store1 events invisible to store2
// Events in store1 go to stream "tenant-a_events"
// Events in store2 go to stream "tenant-b_events"

Code Analysis

Current Implementation vs Intended Model

Aggregate: ActorEventStream

Intended: Single-entity aggregate protecting monotonic version invariant

Current:

  • EventStore interface (event.go): Contract for persistence

    • SaveEvent validates version > current (enforces invariant)
    • GetEvents returns events (read model)
    • GetLatestVersion returns max version (read model)
    • ✓ Aligns: Invariant is in SaveEvent signature/contract
  • InMemoryEventStore (store/memory.go): Concrete implementation

    • mu sync.RWMutex: Protects event list
    • events map[string][]*Event: Stores events per actor
    • SaveEvent: Validates version before appending
    • ✓ Aligns: Invariant enforced in SaveEvent
  • JetStreamEventStore (store/jetstream.go): Production implementation

    • mu sync.Mutex: Protects version checks
    • versions map[string]int64: Caches latest version
    • SaveEvent: Validates version in mutex critical section
    • ✓ Aligns: Invariant enforced in SaveEvent with concurrency protection

Design Decision: Why isn't ActorEventStream a concrete class?

Because Aether is a library of primitives, not a framework. The EventStore interface is the aggregate boundary:

  • EventStore.SaveEvent = validate and persist
  • Caller is responsible for: creating event, setting version, deciding action on conflict
  • This is not an omission—it's the design: caller controls retry logic, library enforces invariant

Commands

Intended: SaveEvent, GetLatestVersion, GetEvents, GetEventsWithErrors

Current:

  • SaveEvent → ✓ Aligns (explicit command)
  • GetLatestVersion → ✓ Aligns (read model query)
  • GetEvents → ✓ Aligns (read model query)
  • GetEventsWithErrors → ✓ Aligns (read model query with error visibility)

Events

Intended: EventStored (published), VersionConflict (error response)

Current:

  • Event struct (event.go): ✓ Aligns

    • Data, Metadata, Timestamp captured
    • SetMetadata helpers for tracing
    • ID for deduplication
    • Version for ordering
  • EventBus (eventbus.go): ✓ Aligns

    • Publish(namespace, event) → distributes EventStored
    • Namespace-scoped subscriptions
    • Wildcard patterns for cross-namespace
    • ✓ Correct: EventStored is implicitly published by SaveEvent caller
  • VersionConflictError (event.go): ✓ Aligns

    • Returned from SaveEvent on conflict
    • Contains context: ActorID, AttemptedVersion, CurrentVersion
    • Caller sees it and decides next action

Policies

Intended:

  • Version Validation → ✓ Aligns (SaveEvent enforces)
  • Append-Only → ✓ Aligns (no delete/update in interface)
  • Idempotent Publishing → ✓ Aligns (JetStream dedup by message ID)
  • Snapshot Invalidation → ✗ Missing (not yet implemented)

Read Models

Intended:

  • EventStream (GetEvents) → ✓ Aligns
  • CurrentVersion (GetLatestVersion) → ✓ Aligns
  • StateSnapshot (GetLatestSnapshot) → ✓ Aligns
  • Namespace-Scoped Events (EventBus.Subscribe) → ✓ Aligns

Snapshots

Intended: Value object, separate from events

Current:

  • ActorSnapshot struct (event.go): ✓ Aligns
  • SnapshotStore interface (event.go): ✓ Aligns
    • SaveSnapshot, GetLatestSnapshot
    • Separate from EventStore (composition via interface)

Namespace Isolation

Intended: Logical boundary, EventStore and EventBus both support

Current:

  • InMemoryEventStore: No namespace support (testing only)
  • JetStreamEventStore: ✓ Aligns
    • NewJetStreamEventStoreWithNamespace: Convenience function
    • JetStreamConfig.Namespace: Prefix on stream name
    • Sanitization of subject characters
    • Complete isolation at storage layer
  • EventBus: ✓ Aligns
    • Subscribe(namespacePattern): Exact + wildcard patterns
    • Publish(namespaceID, event): Delivers to pattern-matching subscribers
    • MatchNamespacePattern: NATS-style subject matching

Design Decisions & Rationale

Decision 1: Version Passed by Caller, Not Auto-Incremented

Intended Design:

// Caller responsible for versioning
currentVersion, _ := store.GetLatestVersion(actorID)
event.Version = currentVersion + 1
err := store.SaveEvent(event)

Alternative (Not Taken):

// Library increments version
event.Version = 0 // Library fills in
err := store.SaveEvent(event)

Why Caller-Controlled?

  1. Optimistic Concurrency Control: Caller has saved the version it expected. If another writer moved forward, caller can detect and decide:

    • Idempotent command? Skip, already done
    • Concurrent command? Merge their changes and retry
    • Conflicting? Backoff and alert
  2. Clarity: Version is not magic. Caller knows exactly what version they're writing.

  3. Idempotence: Caller can use same event ID + version pair to detect and skip duplicates.

  4. Flexibility: Different domains have different conflict resolution strategies. Library doesn't impose one.

Cost: Caller must manage version explicitly. But this is intentional: primitives over frameworks.


Decision 2: Fail on Conflict, Don't Retry

Intended Design:

err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
    // Caller decides: retry, skip, or backoff
}

Alternative (Not Taken):

err := store.SaveEvent(event) // Always succeeds (auto-retries internally)

Why Fail Explicitly?

  1. Observability: Caller sees conflict and can log/metric it.

  2. Idempotence Safety: Auto-retry + auto-increment could turn conflict into duplicate write:

    • Attempt 1: version 2, fails (another writer got version 2)
    • Auto-retry: version 3, succeeds (but original command lost)
    • Caller doesn't know original event wasn't saved
  3. Control: Caller decides retry strategy:

    • Backoff + retry: For transient contention
    • Skip: For duplicate retries of same command
    • Alert: For unexpected behavior
    • Fail-fast: For critical paths

Cost: Caller must handle retries. But conflict should be rare (different aggregates, different writers).


Decision 3: Snapshots Separate from EventStore

Intended Design:

// Snapshot is optional optimization, not required
type SnapshotStore interface {
    EventStore
    SaveSnapshot(snapshot *ActorSnapshot) error
    GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
}

Alternative (Not Taken):

// Snapshot baked into EventStore interface
type EventStore interface {
    SaveEvent(event *Event) error
    GetEvents(actorID string, fromVersion int64) ([]*Event, error)
    SaveSnapshot(snapshot *ActorSnapshot) error
    GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
}

Why Separate Interface?

  1. Optional: Many domains don't need snapshots (small event streams).

  2. Composition: Caller can wrap only if needed:

    var store aether.EventStore = inmem.NewInMemoryEventStore()
    // No snapshots
    
    var snapshotStore aether.SnapshotStore = jsMem.NewJetStreamEventStore(...)
    // With snapshots
    
  3. Clarity: Snapshot logic (when to snapshot, when to use) is domain concern, not library concern.

Cost: Caller must check interface type to access snapshots. But this is fine—snapshots are optimization, not core.


Decision 4: Metadata Tracing Fields on Event

Intended Design:

event.SetCorrelationID(correlationID)
event.SetCausationID(causationID)
event.SetUserID(userID)
event.SetTraceID(traceID)

Why Include?

  1. Auditability: Know who caused what, in what order (causation chains).

  2. Observability: Trace IDs link to distributed tracing (OpenTelemetry).

  3. No Enforcement: Metadata is optional. Caller sets if needed.

  4. Standard Names: Metadata keys (MetadataKeyCorrelationID, etc.) are constants, not magic strings.

Cost: Event struct has extra field. But it's map[string]string, not dozens of fields.


Decision 5: Namespace Isolation via Stream Prefixing (JetStream)

Intended Design:

store, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-a")
// Creates stream: "tenant-a_events"

store, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-b")
// Creates stream: "tenant-b_events"
// Complete isolation: events in tenant-a invisible to tenant-b

Alternative (Not Taken):

// Single stream, subject-based filtering
// Stream "events" with subjects: "events.tenant-a.>", "events.tenant-b.>"
// But then sharing consumers between tenants is risky

Why Stream-Level Prefix?

  1. Strong Isolation: Each namespace has its own stream. No cross-contamination.

  2. Storage Guarantee: Compliance/legal: tenant data completely separated at storage layer.

  3. Independent Scaling: Each tenant's stream can have different retention/replicas.

  4. Simplicity: No subject-level filtering logic needed.

Cost: Multiple streams for multiple namespaces. But JetStream handles this efficiently.


Decision 6: Replay Errors Visible (EventStoreWithErrors Interface)

Intended Design:

type EventStoreWithErrors interface {
    EventStore
    GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error)
}

// Returns both valid events and errors
result, _ := store.GetEventsWithErrors(actorID, 0)
for _, err := range result.Errors {
    log.Printf("Corrupted event at sequence %d: %v", err.SequenceNumber, err.Err)
}

Alternative (Not Taken):

// Silently skip malformed events
events, _ := store.GetEvents(actorID, 0)
// Corruption invisible

Why Expose Errors?

  1. Data Quality Visibility: Silent data loss is the worst. Caller must see it.

  2. Recovery: Operator can manually inspect and repair corrupted data.

  3. Alerting: Corrupted events trigger alerts (e.g., Prometheus metric).

  4. Compliance: Audit trail shows if/when data quality degraded.

Cost: Caller must handle errors. But better than silent data loss.

Backward Compatibility: GetEvents still silently skips (for callers that don't care). GetEventsWithErrors is opt-in.


Alignment with Vision

"Primitives Over Frameworks"

This context provides primitives:

  • Event: Immutable fact
  • EventStore: Interface for persistence (two implementations)
  • Version: Monotonic order
  • Namespace: Logical boundary

Not a framework:

  • No event schema enforcement (Data is map[string]interface{})
  • No command handlers (Caller handles SaveEvent)
  • No projections (Caller builds read models)
  • No sagas (Caller coordinates multi-aggregate transactions)

Caller composes these into their domain model.

"NATS-Native"

Event Sourcing uses:

  • JetStreamEventStore: Built on JetStream, not bolted on
  • NATSEventBus: Distributes events via JetStream subjects
  • Namespace isolation: Uses stream naming convention, not generic filtering

"Resource Conscious"

  • InMemoryEventStore: Minimal overhead (map + mutex)
  • JetStreamEventStore: Leverages NATS JetStream efficiency
  • No unnecessary serialization: Events stored as JSON (standard, efficient)
  • Caching: Version cache in JetStreamEventStore reduces repeated lookups

"Events as Complete History"

  • Append-only: Events never deleted
  • Immutable: Events never modified
  • Durable: JetStream persists to disk
  • Replayable: Full event history available for state reconstruction

Gaps & Improvements

Gap 1: Snapshot Invalidation Policy (Not Implemented)

Current: Snapshots are never invalidated. If snapshot is stale (many events since snapshot), replay is slow.

Intended: Snapshot valid only until MaxEventsWithoutSnapshot new events are added.

Improvement:

  • Add MaxVersionDelta to GetLatestSnapshot logic
  • Snapshot at (latestVersion - snapshot.Version) < MaxVersionDelta → use snapshot
  • Snapshot at (latestVersion - snapshot.Version) >= MaxVersionDelta → replay all events

Effort: Low (logic in GetEvents + snapshot)


Gap 2: Bulk Operations (Not Implemented)

Current: SaveEvent is one event at a time. No transaction for multiple events.

Intended: SaveMultipleEvents for atomic saves of related events.

Improvement:

  • Add SaveMultipleEvents(actorID, events) to EventStore interface
  • Validates all versions are consecutive + greater than current
  • Saves all or none (atomic)
  • Useful for: snapshot + first event after, or replayed events

Effort: Medium (concurrency careful in JetStream)


Gap 3: Event Schema Evolution (Not Addressed)

Current: Event.Data is map[string]interface{}. No type safety, no schema validation.

Intended: Caller responsible for versioning (e.g., v1/v2 fields in Data map).

Improvement: (Probably out of scope for Event Sourcing context)

  • Add EventType versioning (e.g., "OrderPlacedV1", "OrderPlacedV2")
  • Or: Add field in Data (e.g., Data["_schema"] = "v2")
  • Document best practices for schema evolution

Effort: High (requires design + examples)


Gap 4: Event Deduplication on Replay (Not Implemented)

Current: Caller must avoid sending same event ID twice (dedup is up to them).

Intended: EventStore rejects duplicate event IDs for same actor.

Improvement:

  • Track event IDs per actor
  • SaveEvent: Check if event ID already exists for actor
  • If yes: Return VersionConflictError or DuplicateEventError
  • InMemoryEventStore: Add idMap; JetStreamEventStore already has dedup via message ID

Effort: Low (simple map check)


Gap 5: Time Travel Queries (Not Implemented)

Current: Can only replay forward (from version N onward).

Intended: Query state at specific point in time or version.

Improvement: (Probably out of scope)

  • Add GetEventsUntilVersion(actorID, version) to get events up to version N
  • Caller can implement "state at timestamp T" by filtering

Effort: Low (small addition to EventStore interface)


Gap 6: Distributed Tracing Integration (Partial)

Current: Metadata fields for TraceID/SpanID, but no automatic integration.

Intended: Automatic trace context propagation.

Improvement:

  • Add helper to extract trace context from context.Context
  • SaveEvent extracts TraceID/SpanID from context and sets on event
  • Or: Metrics publisher emits spans for event persistence

Effort: Medium (OpenTelemetry integration)


Testing Considerations

Unit Tests

EventStore contract tests (to verify implementations):

  • SaveEvent rejects version <= current
  • SaveEvent accepts version > current
  • GetLatestVersion returns max of all events
  • GetEvents filters by fromVersion
  • GetEventsWithErrors separates good events from corrupted ones
  • Idempotent event ID → no duplicate (if implemented)

Value object tests:

  • Event immutability (no public setters)
  • Version comparison
  • Metadata helpers (SetMetadata, GetMetadata, etc.)

Integration Tests

EventBus + EventStore + Namespace isolation:

  • Publish to namespace A, verify only namespace A subscribers receive
  • Wildcard subscription receives from multiple namespaces
  • Event filtering (EventTypes, ActorPattern) works correctly

JetStream specifics:

  • Multiple instances of JetStreamEventStore with different namespaces don't interfere
  • Version cache invalidates correctly on SaveEvent
  • Corrupted events in JetStream trigger ReplayError

Brownfield Migration

If integrating Event Sourcing into existing system:

  1. Start with InMemoryEventStore in tests
  2. Add JetStreamEventStore for integration tests (with NATS running)
  3. Gradually migrate command handlers to use SaveEvent
  4. Add EventBus subscribers for downstream contexts
  5. Monitor VersionConflict errors to understand contention

References

Key Files

  • /Users/hugo.nijhuis/src/github/flowmade-one/aether/event.go: Event, EventStore, VersionConflictError, ActorSnapshot, SnapshotStore, EventStoreWithErrors
  • /Users/hugo.nijhuis/src/github/flowmade-one/aether/store/memory.go: InMemoryEventStore
  • /Users/hugo.nijhuis/src/github/flowmade-one/aether/store/jetstream.go: JetStreamEventStore with namespace support
  • /Users/hugo.nijhuis/src/github/flowmade-one/aether/eventbus.go: EventBus, EventBroadcaster, SubscriptionFilter
  • /Users/hugo.nijhuis/src/github/flowmade-one/aether/pattern.go: Namespace pattern matching, actor pattern matching
  • Clustering: Uses EventStore + EventBus for distributed coordination (leader election, shard assignment)
  • Event Bus (NATS): Extends EventBus with cross-node distribution via NATS JetStream
  • Actor Model (downstream): Uses EventStore to persist actor state changes and broadcast via EventBus

Summary: Invariants, Aggregates, Commands, Events

Artifact Name Details
Invariant 1 Monotonic Versions Version > previous version for same actor
Invariant 2 Append-Only Events never deleted or modified
Aggregate ActorEventStream Root: actor + current version; enforces monotonic versions
Command 1 SaveEvent Persist event; fail if version conflict
Command 2 GetLatestVersion Read current version (for optimistic concurrency)
Command 3 GetEvents Replay events from version N
Command 4 GetEventsWithErrors Replay with visibility into corrupted events
Event 1 EventStored (implicit) Published to EventBus after SaveEvent succeeds
Event 2 VersionConflict (error response) Returned from SaveEvent on conflict
Policy 1 Version Validation SaveEvent enforces version > current
Policy 2 Append-Only No delete/update in EventStore interface
Policy 3 Idempotent Publishing JetStream dedup by event ID
Read Model 1 EventStream GetEvents returns list of events
Read Model 2 CurrentVersion GetLatestVersion returns max version
Read Model 3 StateSnapshot GetLatestSnapshot returns latest snapshot
Read Model 4 Namespace-Scoped Events EventBus.Subscribe delivers matching events