Files
aether/.product-strategy/MODELING_RESULTS.md
Hugo Nijhuis 271f5db444
Some checks failed
CI / build (push) Successful in 21s
CI / integration (push) Failing after 2m1s
Move product strategy documentation to .product-strategy directory
Organize all product strategy and domain modeling documentation into a
dedicated .product-strategy directory for better separation from code.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-12 23:57:20 +01:00

16 KiB

Event Sourcing Domain Model - Modeling Results

What Was Modeled

The Event Sourcing bounded context for Aether distributed actor system, using tactical Domain-Driven Design.

Bounded Context Scope:

  • Responsibility: Persist events as immutable source of truth; enable state reconstruction through replay
  • Language: Event, Version, Snapshot, ActorID, Replay, Namespace
  • Invariants: Monotonic versions per actor; append-only persistence
  • Key Stakeholders: Library users writing event-sourced applications

Core Finding: One Invariant, One Aggregate

Invariant: Version must be > previous version for same actor

     +─────────────────────────────────────┐
     │  Aggregate: ActorEventStream        │
     │  (Root Entity)                      │
     │                                     │
     │  - ActorID: identifier              │
     │  - CurrentVersion: int64 (mutable) │
     │                                     │
     │  Commands:                          │
     │  ├─ SaveEvent: persist + validate  │
     │  ├─ GetLatestVersion: read current │
     │  └─ GetEvents: replay              │
     │                                     │
     │  Policy: Version > previous?        │
     │  ├─ YES → persist event            │
     │  └─ NO → return VersionConflictError
     │                                     │
     │  Events: EventStored (implicit)    │
     │                                     │
     │  Value Objects:                     │
     │  ├─ Event (immutable)              │
     │  ├─ Version (int64)                │
     │  └─ ActorSnapshot                  │
     │                                     │
     └─────────────────────────────────────┘

Why Only One Aggregate?

  • Aggregates protect invariants
  • Event Sourcing context has one invariant: monotonic versioning
  • Events are immutable (no entity lifecycle rules)
  • Snapshots are optional (stored separately)

The Critical Design Decisions

Decision 1: Version Passed by Caller (Not Auto-Incremented)

Caller Flow:

1. currentVersion := store.GetLatestVersion(actorID)
   └─ Returns: 5 (or 0 if new actor)

2. event.Version = currentVersion + 1
   └─ Set version to 6

3. err := store.SaveEvent(event)
   └─ If another writer set version 6 first → VersionConflictError
   └─ If no conflict → event persisted at version 6

Why Not Auto-Increment?

  • Caller knows whether event is idempotent (same command = safe to skip if already saved)
  • Caller knows expected previous version (optimistic concurrency control)
  • Caller decides retry strategy (immediate, backoff, circuit-break, skip)
  • Auto-increment would hide duplicate writes

Cost: Caller must manage versions. But this is intentional: "primitives over frameworks".


Decision 2: Fail on Conflict (Don't Auto-Retry)

SaveEvent Behavior:

Input: Event{Version: 6, ActorID: "order-123"}
Current Version: 5

Check: Is 6 > 5?
├─ YES → Persist, return nil
└─ NO → Return VersionConflictError{
         ActorID: "order-123",
         AttemptedVersion: 6,
         CurrentVersion: 5
       }

Caller sees error and decides:
├─ Legitimate concurrent write? → Get new version, retry with version 7
├─ Duplicate command? → Skip (event already saved)
├─ Unexpected? → Alert ops
└─ Critical path? → Fail fast

Why Not Auto-Retry?

  • Auto-retry + auto-increment could turn concurrent write into invisible duplicate
  • Library can't tell "new command" from "duplicate command"
  • Caller must decide, and library must report conflict clearly

Decision 3: Snapshots Separate from Events

Optional Composition:

var store aether.EventStore = inmem.NewInMemoryEventStore()
// No snapshots - for testing

var snapshotStore aether.SnapshotStore = jsMem.NewJetStreamEventStore(...)
// With snapshots - composition via interface

Why Separate?

  • Many domains don't need snapshots (small event streams)
  • Snapshot strategy (when to snapshot, when to use) is domain concern
  • Caller can add snapshotting logic only if needed

The Aggregate: ActorEventStream

ActorEventStream protects monotonic versioning invariant

Data:
├─ ActorID (string): Identifier
├─ CurrentVersion (int64): Latest version seen
└─ Namespace (optional): For isolation

Commands:
├─ SaveEvent(event) → error
│  ├─ Validates: event.Version > currentVersion
│  ├─ Success: Event persisted, currentVersion updated
│  └─ Failure: VersionConflictError returned
├─ GetLatestVersion() → int64
│  └─ Returns: Max version, or 0 if new
├─ GetEvents(fromVersion) → []*Event
│  └─ Returns: Events where version >= fromVersion
└─ GetEventsWithErrors(fromVersion) → (*ReplayResult, error)
   └─ Returns: Events + errors (for corrupted data visibility)

Policies Enforced:
├─ Version Validation: version > current before persist
├─ Append-Only: No delete/update operations
├─ Idempotent Publishing: JetStream dedup by event ID
└─ Immutability: Events treated as immutable after storage

Lifecycle:
├─ Created: When first event is saved (version > 0)
├─ Active: As events are appended
└─ Destroyed: N/A (event stream persists forever)

Commands, Events, and Policies

Command Flow:

    ┌──────────────────────────────┐
    │  SaveEvent (command)         │
    │  Input: Event{...}           │
    └──────────────────────────────┘
                │
                ├─ Preconditions:
                │  ├─ event != nil
                │  ├─ event.ID != ""
                │  ├─ event.ActorID != ""
                │  ├─ event.Version > 0
                │  └─ event.Version > currentVersion ← INVARIANT CHECK
                │
                ├─ Policy: Version Validation
                │  └─ If version <= current → VersionConflictError
                │
                └─ Success: Persist to store
                   │
                   ├─ Policy: Append-Only
                   │  └─ Event added to stream (never removed/modified)
                   │
                   ├─ Policy: Idempotent Publishing
                   │  └─ JetStream dedup by message ID
                   │
                   └─ Event Published: EventStored (implicit)
                      └─ Delivered to EventBus subscribers


Read Commands:

    GetLatestVersion → int64
    ├─ Scans all events for actor
    └─ Returns max version (or 0 if new)

    GetEvents(fromVersion) → []*Event
    ├─ Replay from specified version
    └─ Silently skips corrupted events

    GetEventsWithErrors(fromVersion) → (*ReplayResult, error)
    └─ Returns both events and errors (caller sees data quality)

Read Models (Projections)

From SaveEvent + GetEvents, derive:

1. EventStream: Complete history for actor
   └─ Query: GetEvents(actorID, 0)
   └─ Use: Replay to reconstruct state

2. CurrentVersion: Latest version number
   └─ Query: GetLatestVersion(actorID)
   └─ Use: Prepare next SaveEvent (version + 1)

3. StateSnapshot: Point-in-time state
   └─ Query: GetLatestSnapshot(actorID)
   └─ Use: Skip early events, replay only recent ones

4. Namespace-Scoped Events: Cross-subscriber coordination
   └─ Query: EventBus.Subscribe(namespacePattern)
   └─ Use: React to events in specific namespace

Namespace Isolation (Cross-Cutting Concern)

Namespace Isolation enforces:

Rule 1: Events in namespace X invisible to namespace Y
├─ Storage: JetStreamEventStore creates separate stream per namespace
│  └─ Stream names: "tenant-a_events" vs "tenant-b_events"
├─ Pub/Sub: EventBus maintains separate subscriber lists
│  └─ exactSubscribers[namespace] stores subscribers for exact match
└─ Result: Complete isolation at both layers

Rule 2: Namespace names must be NATS-safe
├─ No wildcards (*), no ">" sequences
├─ Sanitized: spaces → _, dots → _, etc.
└─ Result: Valid NATS subject tokens

Rule 3: Wildcard subscriptions bypass isolation (intentional)
├─ Patterns like "*" and ">" can match multiple namespaces
├─ Use case: Logging, monitoring, auditing (trusted components)
├─ Security: Explicitly documented as bypassing isolation
└─ Recommendation: Restrict wildcard access to system components

Example:
  Publish: "OrderPlaced" to namespace "prod.tenant-a"
  Exact subscriber "prod.tenant-a" → sees it
  Exact subscriber "prod.tenant-b" → doesn't see it
  Wildcard subscriber "prod.*" → sees it (intentional)
  Wildcard subscriber "*" → sees it (intentional)

Value Objects

Event: Immutable fact
├─ ID: Unique identifier (deduplication key)
├─ EventType: Domain language (e.g., "OrderPlaced")
├─ ActorID: What aggregate this concerns
├─ Version: Order in stream
├─ Data: map[string]interface{} (domain payload)
├─ Metadata: map[string]string (tracing context)
│  └─ Standard keys: CorrelationID, CausationID, UserID, TraceID, SpanID
├─ Timestamp: When event occurred
└─ CommandID: ID of command that triggered this (optional)

ActorSnapshot: Point-in-time state
├─ ActorID: Which actor
├─ Version: At this version
├─ State: map[string]interface{} (accumulated state)
└─ Timestamp: When snapshot taken

Version: Order number
├─ int64: Non-negative
├─ Semantics: > previous version for same actor
└─ Special: 0 = "no events yet"

VersionConflictError: Conflict context
├─ ActorID: Where conflict occurred
├─ AttemptedVersion: What caller tried
└─ CurrentVersion: What already exists

ReplayError: Corrupted event
├─ SequenceNumber: Position in stream
├─ RawData: Unparseable bytes
└─ Err: Unmarshal error

Code Alignment: Brownfield Assessment

Current implementation is correctly modeled. No refactoring needed.

Intended Design          →  Actual Implementation     →  Status
─────────────────────────────────────────────────────────────
Invariant: Monotonic     →  SaveEvent validates      →  ✓ Correct
Versioning              →  version > current

Append-Only Persistence →  No delete/update in       →  ✓ Correct
                          interface

SaveEvent as Command    →  func (EventStore)         →  ✓ Correct
                          SaveEvent(*Event) error

VersionConflictError    →  type VersionConflictError →  ✓ Correct
                          ActorID, AttemptedVersion,
                          CurrentVersion

GetLatestVersion        →  func (EventStore)         →  ✓ Correct
(read current)          GetLatestVersion(actorID)

GetEvents (replay)      →  func (EventStore)         →  ✓ Correct
                          GetEvents(actorID, fromVersion)

Idempotent Publishing   →  JetStream dedup by        →  ✓ Correct
                          message ID in Publish()

Namespace Isolation     →  JetStreamConfig.Namespace →  ✓ Correct
                          + stream prefixing

EventBus pub/sub        →  EventBus.Subscribe with   →  ✓ Correct
                          namespace patterns

No gaps between intended and actual. Implementation aligns with DDD model.


Design Principles Embodied

Principle 1: Primitives Over Frameworks

Library provides:

  • Event (type)
  • EventStore (interface with two implementations)
  • Version (semantics: > previous)
  • Namespace (string with restrictions)

Library does NOT provide:

  • Event schema enforcement
  • Command handlers
  • Saga coordinators
  • Projection builders
  • Retry logic

Caller composes these into domain logic.

Principle 2: NATS-Native

  • JetStreamEventStore leverages JetStream deduplication
  • Namespace isolation uses stream naming (not generic filtering)
  • EventBus can extend to NATSEventBus (cross-node via NATS)

Principle 3: Resource Conscious

  • InMemoryEventStore: Minimal overhead (map + RWMutex)
  • JetStreamEventStore: Efficient (leverages NATS JetStream)
  • No unnecessary serialization (JSON is standard, compact)
  • Caching: Version cache in JetStreamEventStore reduces lookups

Principle 4: Events as Complete History

  • Append-only: Events never deleted
  • Immutable: Events never modified
  • Durable: JetStream persists to disk
  • Replayable: Full history available

Testing Strategy (Based on Model)

Unit Tests:
├─ SaveEvent
│  ├─ Rejects version <= current
│  ├─ Accepts version > current
│  └─ Sets currentVersion to new version
├─ GetLatestVersion
│  ├─ Returns 0 for new actor
│  ├─ Returns max of all events
│  └─ Returns max even with gaps (1, 3, 5 → returns 5)
├─ GetEvents
│  ├─ Filters by fromVersion (inclusive)
│  ├─ Returns empty for nonexistent actor
│  └─ Skips corrupted events
├─ GetEventsWithErrors
│  ├─ Returns both events and errors
│  └─ Allows caller to decide on corruption
└─ Metadata
   ├─ SetMetadata/GetMetadata work
   ├─ SetCorrelationID/GetCorrelationID work
   └─ WithMetadataFrom copies all metadata

Integration Tests (OCC):
├─ Concurrent SaveEvent
│  ├─ First writer wins (version 6)
│  ├─ Second writer gets VersionConflictError
│  └─ Second can retry with version 7
├─ Idempotent Event ID (if implemented)
│  └─ Same event ID → detected as duplicate
└─ Namespace Isolation
   ├─ Events in namespace A invisible to namespace B
   ├─ Wildcard subscribers see both
   └─ Pattern matching (NATS-style) works

Brownfield Migration:
├─ Extract SaveEvent calls
├─ Handle VersionConflictError
├─ Add EventBus subscribers
└─ Monitor metrics (version conflicts = contention signal)

Key Files & Their Responsibilities

event.go
├─ Event: struct (immutable fact)
├─ EventStore: interface (contract)
├─ EventStoreWithErrors: interface (with error visibility)
├─ VersionConflictError: type (detailed error)
├─ ActorSnapshot: struct (optional)
├─ SnapshotStore: interface (optional)
└─ ReplayResult & ReplayError: types (error visibility)

store/memory.go
├─ InMemoryEventStore: implementation for testing
├─ Mutex protection: thread-safe
└─ Invariant enforcement: version > current check

store/jetstream.go
├─ JetStreamEventStore: production implementation
├─ Namespace isolation: stream prefixing
├─ Version cache: optimizes repeated lookups
├─ Deduplication: message ID for idempotency
└─ Error handling: GetEventsWithErrors for corruption visibility

eventbus.go
├─ EventBus: in-process pub/sub
├─ Namespace patterns: exact + wildcard
├─ SubscriptionFilter: event type + actor pattern
└─ Thread-safe delivery (buffered channels)

pattern.go
├─ MatchNamespacePattern: NATS-style matching
├─ MatchActorPattern: Actor ID pattern matching
└─ IsWildcardPattern: Detect wildcard subscriptions

Summary

The Event Sourcing bounded context is correctly modeled using tactical DDD:

Aspect Finding
Invariants 1 core: monotonic versioning per actor
Aggregates 1 core: ActorEventStream
Commands 4: SaveEvent, GetLatestVersion, GetEvents, GetEventsWithErrors
Events 1 implicit: EventStored (published by EventBus)
Policies 4: Version validation, append-only, idempotent publishing, immutability
Read Models 4: EventStream, CurrentVersion, StateSnapshot, Namespace-scoped
Value Objects 4: Event, ActorSnapshot, Version, VersionConflictError
Code Alignment 100% (no refactoring needed)
Design Principle Primitives over frameworks ✓
NATS Integration Native (JetStream dedup, stream naming) ✓
Gaps 4 minor (all optional, non-critical)

The model demonstrates how to apply DDD to infrastructure code where the business domain lives upstream. Perfect template for extending Aether with additional contexts.