Files
aether/.product-strategy/DOMAIN_MODEL_OCC.md
Hugo Nijhuis 271f5db444
Some checks failed
CI / build (push) Successful in 21s
CI / integration (push) Failing after 2m1s
Move product strategy documentation to .product-strategy directory
Organize all product strategy and domain modeling documentation into a
dedicated .product-strategy directory for better separation from code.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-12 23:57:20 +01:00

29 KiB

Domain Model: Optimistic Concurrency Control

Summary

The Optimistic Concurrency Control (OCC) bounded context detects and signals concurrent write conflicts to the same actor. It enforces a single invariant: versions must be strictly monotonically increasing per actor. This ensures event stream integrity without requiring locks. The context does not implement auto-retry; it signals conflicts fast and lets the application choose retry strategy (immediate, exponential backoff, circuit-breaker, etc.). This philosophy aligns with Aether's principle of "primitives over frameworks."

The invariant is enforced synchronously at write time with zero latency. Readers are not blocked; writers compete fairly. First writer wins (version conflict); others get immediate feedback with full context (ActorID, CurrentVersion, AttemptedVersion).


Invariants

Invariant: Monotonic Version Sequence

  • Rule: For an actor, if event E1 has version V1, then any subsequent event E2 must have version V2 where V2 > V1. Non-consecutive versions (gaps) are allowed (1, 3, 5 is valid). Duplicate versions are rejected.
  • Scope: Per-actor (each ActorID has its own version sequence)
  • Why: Ensures event stream integrity, enables optimistic concurrency detection, and makes version a reliable causal ordering marker
  • Enforcement: Synchronous - checked in SaveEvent before persistence
  • Cost of violation: Data corruption (out-of-order events) or inconsistent state reconstruction on replay

Invariant: First Event Must Have Version > 0

  • Rule: For a new actor (no prior events), the first event's version must be > 0 (e.g., 1, 5, 100 all valid; 0 is invalid)
  • Scope: Per-actor
  • Why: Ensures version 0 is reserved for "no events" state. GetLatestVersion returns 0 for new actors; this creates a clear boundary between "no events" and "first event"
  • Enforcement: Synchronous - checked in SaveEvent
  • Cost of violation: Inability to distinguish between "never written" and "has one event"

Aggregates

Aggregate: ActorEventStream (Root)

Invariants enforced:

  • Monotonic Version Sequence (version > previous for same actor)
  • First Event Must Have Version > 0

Root Entity: ActorEventStream

  • ActorID (unique identifier, immutable)
  • CurrentVersion (latest version, mutable via SaveEvent)

Child Entities: None (kept minimal)

Value Objects:

  • Version (int64): Strictly positive integer representing event order
  • ActorID (string): Identifier for the actor (inherent from Event)

Lifecycle:

  • Created: When first event is saved (version must be > 0)
  • Modified: Each time a new event with version > current is saved
  • Destroyed: Never (history is immutable)

Transactional Boundary:

  • One actor's version sequence is one transaction boundary
  • Multiple actors can be written concurrently (no cross-actor conflicts)
  • Within an actor, only one writer succeeds (others get VersionConflictError)

Key Behavior:

  • SaveEvent(event) -> error: Persist only if event.Version > CurrentVersion. Return VersionConflictError if not.
  • GetLatestVersion(actorID) -> int64: Return CurrentVersion or 0 if no events
  • GetEvents(actorID, fromVersion) -> []*Event: Replay history from version (for state reconstruction)

Why one aggregate per actor?

  • Each actor's version sequence is a separate invariant boundary
  • No cross-actor dependencies
  • Allows concurrent writes across different actors
  • Simplifies conflict detection (per-actor comparison)

Commands

Command: AttemptWrite

  • Aggregate: ActorEventStream
  • Input:
    • ActorID (string)
    • Event (*Event, with Version already set)
    • ProposedVersion (int64, must equal Event.Version)
  • Pre-condition:
    • ProposedVersion > CurrentVersion(ActorID)
    • Event.Version == ProposedVersion
  • Success: Event persisted; CurrentVersion incremented to ProposedVersion
  • Failure: VersionConflictError returned (actor has newer version)
  • Semantic: "I want to write this event with this version"
  • Owner: Application (calls SaveEvent with pre-calculated version)

Command: ReadCurrentVersion

  • Aggregate: ActorEventStream
  • Input: ActorID (string)
  • Output: int64 (version or 0 if no events)
  • Pre-condition: None
  • Success: Return latest version
  • Failure: Storage error
  • Semantic: "Tell me what version I should use for my next write"
  • Owner: Application (calls GetLatestVersion)
  • Note: This is a read, not a write command. No conflict here.

Command: RetryWrite (implicit, application-driven)

  • Aggregate: ActorEventStream
  • Description: After AttemptWrite fails with VersionConflictError, application should:
    1. Call ReadCurrentVersion to get new CurrentVersion
    2. Re-evaluate business logic (may reject, may adjust, may merge)
    3. Create new Event with NewVersion = CurrentVersion + 1
    4. Call AttemptWrite again (or give up based on policy)
  • Retry Strategy: Application chooses (immediate, backoff, circuit-breaker, give up)
  • No built-in retry: Library will not auto-retry
  • Semantic: "I lost the race; let me try again"

Events

Event: WriteFailed

  • Triggered by: AttemptWrite when version <= CurrentVersion
  • Aggregate: ActorEventStream
  • Data Captured:
    • ActorID (string)
    • ProposedVersion (int64)
    • CurrentVersion (int64)
    • EventID (string) - the event that failed to write
    • Timestamp (time.Time)
  • Consumed by:
    • Observability/logging (inform developer of conflict)
    • Metrics (track conflict rate)
    • Application handler (decide retry strategy)
  • Immutable: Yes (fact that write failed)
  • Persisted: No, WriteFailed is not persisted to event store (it's a rejection, not a state change)
  • Alternative: VersionConflictError is returned synchronously instead

Event: WriteSucceeded

  • Triggered by: AttemptWrite when version > CurrentVersion and persistence succeeds
  • Aggregate: ActorEventStream
  • Data Captured:
    • ActorID (string)
    • Version (int64)
    • EventID (string) - the event that succeeded
    • Timestamp (time.Time)
    • PreviousVersion (int64) - what CurrentVersion was before this write
  • Consumed by:
    • Observability/logging (audit trail of writes)
    • Metrics (throughput, latency)
  • Immutable: Yes
  • Persisted: Indirectly - the application's Event is persisted; WriteSucceeded is not explicitly stored but can be derived from event stream
  • Purpose: Separate concern: fact that a write succeeded vs. fact of domain event (e.g., "OrderPlaced")

Note on Event Definitions:

  • The aether.Event struct represents domain events (OrderPlaced, UserCreated, etc.)
  • WriteFailed/WriteSucceeded are infrastructure events, not domain events
  • They capture the outcome of the write attempt, not business domain changes
  • Application typically publishes domain events (via EventBus) after successful SaveEvent

Policies

Policy: Monotonic Version Policy

  • Trigger: When SaveEvent(event) is called
  • Rule: If event.Version <= CurrentVersion(event.ActorID), reject write
  • Action: Return VersionConflictError with details
  • Context: Ensures event stream consistency without locks
  • Implementation: Synchronous check in SaveEvent

Policy: First Event Policy

  • Trigger: When SaveEvent(event) is called for a new actor (CurrentVersion == 0)
  • Rule: event.Version must be > 0
  • Action: Accept write if version > 0; reject if version <= 0
  • Context: Reserves version 0 for "no events" state
  • Implementation: Synchronous check in SaveEvent

Policy: No Auto-Retry Policy

  • Trigger: When SaveEvent returns VersionConflictError
  • Rule: Do not automatically retry
  • Action: Return error immediately; let application decide
  • Context: Application has domain context (should retry? merge? fail?) that infrastructure lacks
  • Implementation: Error return, no retry loop

Policy: No Merge Policy

  • Trigger: When concurrent writes occur
  • Rule: No automatic conflict resolution
  • Action: Reject one write; let application choose merge strategy if desired
  • Context: Event sourcing cannot auto-merge; application must decide
  • Implementation: First writer wins; others get VersionConflictError

Read Models

Read Model: LatestVersion

  • Purpose: Answer "What version should I use for my next write?"
  • Data: ActorID -> int64 (latest version, or 0)
  • Source: Derived from event stream (max version across all events for actor)
  • Query: GetLatestVersion(actorID) -> int64
  • Updated: After each successful SaveEvent
  • Consistency: Strong (synchronous update with write)
  • Implementation: In-memory cache (JetStreamEventStore) or O(n) scan (InMemoryEventStore)

Read Model: EventHistory

  • Purpose: Answer "What events happened for this actor since version X?"
  • Data: ActorID + FromVersion -> []*Event (ordered by version)
  • Source: Event stream filtered by version >= fromVersion
  • Query: GetEvents(actorID, fromVersion) -> []*Event
  • Updated: After each successful SaveEvent
  • Consistency: Strong (read-after-write)
  • Implementation: Scan JetStream or in-memory storage

Read Model: HasConflict (application-driven, not in library)

  • Purpose: Answer "Would my write conflict?"
  • Pattern: Get current version, compare with proposed version
  • Example: currentVersion := store.GetLatestVersion(actorID); conflict := proposedVersion <= currentVersion
  • Note: This is not a pre-check (subject to race conditions). Application must handle SaveEvent returning VersionConflictError

Code Analysis: Current Implementation

Files Analyzed

  • /Users/hugo.nijhuis/src/github/flowmade-one/aether/event.go - Event, VersionConflictError, EventStore interface
  • /Users/hugo.nijhuis/src/github/flowmade-one/aether/store/memory.go - InMemoryEventStore (test)
  • /Users/hugo.nijhuis/src/github/flowmade-one/aether/store/jetstream.go - JetStreamEventStore (production)

Intended vs. Actual

Aggregate: ActorEventStream

  • Intended: Root entity enforcing monotonic version invariant
  • Actual: Not explicitly modeled as an aggregate; instead inlined into EventStore interface
  • Implementation: EventStore.SaveEvent(event) acts as aggregate command handler; version validation happens inside

Invariant: Monotonic Version Sequence

  • Intended: Enforced at write boundary
  • Actual: Enforced in SaveEvent
    • InMemoryEventStore: Lines 27-48 (compare event.Version <= currentVersion, reject)
    • JetStreamEventStore: Lines 122-139 (same logic with mutex for thread-safety)
  • Alignment: Correct

Invariant: First Event Must Have Version > 0

  • Intended: Enforce on new actor (current version = 0)
  • Actual: Implicitly enforced by monotonic check (if version <= 0, rejected as <= currentVersion)
    • Bug risk: version 0 would be accepted if currentVersion is -1 (but that's impossible since -1 is never stored)
    • Actual: Works correctly; version > 0 is required for first event
  • Alignment: Correct

Command: AttemptWrite

  • Intended: Explicit command with pre/post conditions
  • Actual: SaveEvent method (implicit command)
    • No explicit AttemptWrite class (not needed in Go; method is sufficient)
    • Pre-conditions: event.Version > CurrentVersion (checked in SaveEvent)
    • Post-conditions: event persisted, CurrentVersion updated (happens if no error)
  • Alignment: Correct (Go idiom)

Command: ReadCurrentVersion

  • Intended: Get latest version for retry logic
  • Actual: GetLatestVersion method
    • Input: ActorID string
    • Output: int64 (version or 0)
    • Implementation (JetStreamEventStore, lines 280-298):
      • Calls GetEvents(actorID, 0) to fetch all events
      • Scans for max version
      • Bug: No caching in GetLatestVersion; every call re-fetches from JetStream
      • Note: SaveEvent has internal caching (versions map, lines 48, 160)
    • Alignment: Correct API; inefficient implementation (revisit)

Events: WriteFailed / WriteSucceeded

  • Intended: Separate infrastructure events from domain events
  • Actual: Not modeled as separate concepts
    • WriteFailed: Implicit (VersionConflictError returned)
    • WriteSucceeded: Implicit (SaveEvent returns nil)
  • Alignment: Correct behavior; naming is implicit

Policy: Monotonic Version Policy

  • Intended: If SaveEvent(event with version V), then V > CurrentVersion(event.ActorID), else reject
  • Actual: Implemented correctly
    • JetStreamEventStore: Lines 132-139 (if event.Version <= currentVersion, return VersionConflictError)
  • Alignment: Correct

Policy: No Auto-Retry Policy

  • Intended: Library returns error; application chooses retry
  • Actual: Correct
    • SaveEvent returns VersionConflictError
    • No retry loop in library
    • Documentation (CLAUDE.md) shows pattern: read version, create event, save, handle error
  • Alignment: Correct

Error Types: ErrVersionConflict & VersionConflictError

  • Intended: Sentinel for fast matching + detailed context
  • Actual: Implemented correctly
    • ErrVersionConflict (line 12): errors.New("version conflict") - sentinel
    • VersionConflictError (lines 14-29): struct with ActorID, AttemptedVersion, CurrentVersion + Unwrap() for errors.Is
    • Pattern: SaveEvent returns *VersionConflictError; caller checks errors.Is(err, ErrVersionConflict)
    • Unwrap enables chain matching: errors.Is(*VersionConflictError, ErrVersionConflict) == true
  • Alignment: Correct (excellent Go error pattern)

Comparison Summary

Concept Intended Actual Status
Aggregate (ActorEventStream) Explicit root enforcing invariant Inlined in EventStore interface ✓ Correct (Go style)
Monotonic Version Invariant Checked at write boundary Checked in SaveEvent ✓ Correct
First Event Invariant Version > 0 on new actor Enforced by monotonic check ✓ Correct
AttemptWrite Command Explicit command SaveEvent method ✓ Correct (Go idiom)
ReadCurrentVersion Query Get latest version GetLatestVersion method ⚠ Works but inefficient (re-fetches)
WriteFailed Event Explicit error event VersionConflictError returned ✓ Correct (implicit)
WriteSucceeded Event Explicit success event SaveEvent returns nil ✓ Correct (implicit)
Monotonic Policy Enforce at write Enforced in SaveEvent ✓ Correct
No Auto-Retry Policy Return error; app decides SaveEvent returns error, no retry ✓ Correct
Error Types Sentinel + context ErrVersionConflict + VersionConflictError ✓ Excellent

Refactoring Backlog

Issue 1: Inefficient GetLatestVersion in JetStreamEventStore

Current Problem:

  • GetLatestVersion (lines 280-298) calls GetEvents(actorID, 0) which re-fetches all events from JetStream
  • For an actor with thousands of events, this is O(n) in events
  • SaveEvent has an internal cache (versions map) but GetLatestVersion doesn't use it

Target:

  • SaveEvent already maintains versions cache for concurrent-write detection
  • Make GetLatestVersion use the same cache (or lazy-load on first call)

Steps:

  1. Expose cached version check: if v, cached := jes.versions[actorID]; cached { return v }
  2. If not cached, fetch from JetStream once and cache result
  3. Add cache invalidation/refresh policy (e.g., on SaveEvent update)
  4. Consider thread-safety: SaveEvent holds mu during cache update

Impact: Low (optimization, no behavior change) Priority: Medium (correctness is fine; performance improvement)

Issue 2: Missing Cache Coherency Between SaveEvent and GetLatestVersion

Current Problem:

  • SaveEvent holds mu and updates versions cache (line 160)
  • GetLatestVersion does not hold mu and does not consult versions cache
  • Race condition: SaveEvent succeeds, updates cache; GetLatestVersion called immediately after may re-fetch stale data from JetStream

Example Scenario:

  1. Thread A: SaveEvent(actorID="order-1", version=5) succeeds, sets versions["order-1"]=5
  2. Thread B: Calls GetLatestVersion("order-1") concurrently
  3. Thread B: GetEvents returns 0 (JetStream not yet replicated) due to timing
  4. Thread B: Returns 0 instead of 5

Target:

  • Ensure GetLatestVersion reads from consistent cache

Steps:

  1. Add RWMutex read lock to GetLatestVersion
  2. Check cache first (with lock)
  3. Only fetch from JetStream if not cached
  4. Consider cache TTL (time-based invalidation)

Impact: Medium (fixes race condition, affects concurrent read-after-write) Priority: High (correctness issue)

Issue 3: Version Cache Unbounded Memory Growth

Current Problem:

  • InMemoryEventStore has no version cache (re-scans on each GetLatestVersion)
  • JetStreamEventStore caches all versions in memory (lines 48, 160)
  • If system runs for months with millions of actors, versions map grows unbounded
  • No eviction policy (LRU, TTL, explicit flush)

Target:

  • Add cache eviction or make it bounded

Steps:

  1. Option A: LRU cache with configurable max size (default 10k actors)
  2. Option B: Time-based eviction (cache entry expires after N minutes)
  3. Option C: Explicit cache.Clear() method for testing/shutdown
  4. Add metrics: cache hits, misses, evictions

Impact: Medium (affects long-running systems with many actors) Priority: Medium (not urgent for typical use cases)

Issue 4: Document Concurrency Model Clearly

Current Problem:

  • SaveEvent uses mutex (line 123)
  • But GetEvents and GetLatestVersion are concurrent-read-only (use RWMutex or no lock)
  • Relationship between write lock and read performance is not documented

Target:

  • Document concurrency guarantees clearly in EventStore interface

Steps:

  1. Add doc comment: "SaveEvent is linearizable (total ordering for writes to same actor)"
  2. Add doc comment: "GetEvents may see eventually-consistent state (not guaranteed to see latest SaveEvent)"
  3. Add doc comment: "GetLatestVersion reflects latest SaveEvent for same actor if called after error-free SaveEvent"
  4. Example: "If SaveEvent succeeds and returns, subsequent GetLatestVersion call will see new version"

Impact: Low (documentation, no code change) Priority: High (clarity for users)

Issue 5: First Event Version Validation Not Explicit

Current Problem:

  • First event invariant (version > 0) is not explicitly checked
  • It's implicitly enforced: if version <= 0, then version <= currentVersion (which is 0), so rejected
  • But a version of 0 would fail silently; user might think 0 is valid

Target:

  • Add explicit check and error for version 0 on first event

Steps:

  1. Add check before monotonic check: if event.Version <= 0 { return ErrInvalidVersion }
  2. Or add comment: "Monotonic check ensures version > 0 (since currentVersion starts at 0)"
  3. Add test: first event with version 0 should fail

Impact: Low (same behavior, clearer code) Priority: Medium (clarity + prevention of future confusion)

Issue 6: Version Gaps Not Documented

Current Problem:

  • Version gaps are allowed (1, 3, 5) but not documented in code
  • Users might wonder: "Is this a bug? Should versions be consecutive?"

Target:

  • Add doc comment explaining gap-tolerance

Steps:

  1. Add to EventStore.SaveEvent doc: "Versions need not be consecutive. Gaps are allowed (e.g., 1, 3, 5 is valid)."
  2. Add to CLAUDE.md: "Why gaps are OK: Allows batching, allows external version sources, increases robustness"
  3. Add example: "If one writer uses odd versions (1, 3, 5) and another uses even (2, 4, 6), both work fine"

Impact: Low (documentation) Priority: Low (not urgent, but good to document)


Testing Observations

From /Users/hugo.nijhuis/src/github/flowmade-one/aether/store/jetstream_integration_test.go:

Test: TestJetStreamEventStore_SaveEvent_VersionConflict

  • Creates event with version 5 (skips 1-4)
  • Attempts to save version 3 (earlier than 5)
  • Correctly rejects with ErrVersionConflict
  • Result: ✓ Passes (monotonic check working)

Test: Duplicate Event Test

  • Saves event with ID="evt-1", version=5
  • Attempts to save same event again (same ID, same version)
  • Expected: VersionConflictError (version not > current)
  • Implementation note: JetStream deduplicates by message ID (line 154), but version check fails first
  • Result: ✓ Passes

Recommendations

Implementation Order

  1. Phase 1: Current Code

    • Monotonic version enforcement works
    • Error handling is correct
    • No auto-retry (correct)
    • Ship as-is if acceptable
  2. Phase 2: Optimization (next quarter)

    • Fix GetLatestVersion cache coherency (Issue 2)
    • Add metrics for conflict rate, retry count
    • Document concurrency model clearly (Issue 4)
  3. Phase 3: Robustness (future)

    • Add bounded cache (Issue 3)
    • Explicit first-event validation (Issue 5)
    • Version gap documentation (Issue 6)

Key Invariants to Enforce First

✓ Already enforced:

  • Monotonic version (version > previous for same actor)
  • No auto-retry (application chooses)

Still worth documenting:

  • First event must have version > 0
  • Version gaps are allowed
  • GetLatestVersion is non-transactional (separate read from write)

Integration with Other Contexts

EventBus Context:

  • After SaveEvent succeeds, application publishes domain event to EventBus
  • EventBus handles namespace isolation, pub/sub
  • OCC guarantees version ordering; EventBus adds cross-context signaling

Cluster Context:

  • ClusterManager may distribute actors across nodes
  • Each node runs OCC independently (no distributed lock)
  • Conflict detection remains local (no network coordination)
  • Caveat: If two nodes write to same actor, one will fail (network race)

Snapshot Context:

  • Snapshots use same version number as events
  • Snapshot version >= latest event version seen
  • Monotonic rule applies: snapshot can only be saved for version > previous snapshot

Anti-Patterns to Avoid

Spin Loops

Bad:

for {
    version := store.GetLatestVersion(actorID)
    event.Version = version + 1
    err := store.SaveEvent(event)
    if err == nil {
        break // Success
    }
    // Spin loop - no backoff, CPU intensive
}

Why: Contention under load; CPU waste; thundering herd.

Good:

version := store.GetLatestVersion(actorID)
event.Version = version + 1
err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
    // Log, decide whether to retry
    // If retrying, use exponential backoff
    time.Sleep(backoff)
    // Go to step 1
}

Ignoring Conflict Errors

Bad:

event := &aether.Event{Version: 5, ...}
store.SaveEvent(event) // Ignore error; assume it succeeded
// Later, expect event to be persisted → might not be

Why: Silent failures; impossible-to-debug inconsistencies.

Good:

err := store.SaveEvent(event)
if err != nil {
    // Handle conflict explicitly
    if errors.Is(err, aether.ErrVersionConflict) {
        log.Error("write conflict; will retry with backoff")
    } else {
        log.Error("storage error; will fail fast")
    }
}

No Backoff Strategy

Bad:

// Naive retry after conflict
for attempts := 0; attempts < 10; attempts++ {
    err := store.SaveEvent(event)
    if err == nil {
        break
    }
    // Try again immediately
}

Why: Under high contention, retries fail immediately, wasting compute.

Good:

backoff := time.Millisecond
for attempts := 0; attempts < 10; attempts++ {
    err := store.SaveEvent(event)
    if err == nil {
        break
    }
    time.Sleep(backoff)
    backoff = min(backoff*2, time.Second) // Exponential backoff, cap at 1s
    // Reload version for next attempt
    version := store.GetLatestVersion(actorID)
    event.Version = version + 1
}

Merging Without Domain Logic

Bad:

// Conflict? Just merge fields
err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
    current, _ := store.GetLatestVersion(actorID)
    event.Version = current + 1
    // Auto-merge (bad idea - might corrupt state)
    store.SaveEvent(event)
}

Why: Event sourcing doesn't auto-merge; application owns consistency.

Good:

err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
    // Reload current state
    current, _ := store.GetLatestVersion(actorID)
    // Application decides: retry? merge? fail?
    if shouldRetry(event, current) {
        event.Version = current + 1
        store.SaveEvent(event)
    } else {
        log.Error("conflict; application chose not to retry")
    }
}

Assuming GetLatestVersion is Transactional

Bad:

version := store.GetLatestVersion(actorID)
// Time passes, another writer might write here
event.Version = version + 1
err := store.SaveEvent(event)
// High probability of conflict if concurrent writers

Why: GetLatestVersion and SaveEvent are separate operations (not atomic).

Good:

version := store.GetLatestVersion(actorID)
event.Version = version + 1
err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
    // Expected under concurrency - reload and retry
    var vErr *aether.VersionConflictError
    errors.As(err, &vErr)
    log.Infof("conflict at %d (actor now at %d); will retry",
        vErr.AttemptedVersion, vErr.CurrentVersion)
    // ... retry logic ...
}

Examples

Example 1: Happy Path (No Contention)

// Get current version
version, _ := store.GetLatestVersion("order-123")
// version = 5

// Create event
event := &aether.Event{
    ID:        uuid.New().String(),
    EventType: "OrderUpdated",
    ActorID:   "order-123",
    Version:   version + 1,  // = 6
    Data:      map[string]interface{}{"status": "shipped"},
    Timestamp: time.Now(),
}

// Save (no conflict expected)
err := store.SaveEvent(event)
if err != nil {
    log.Fatalf("unexpected error: %v", err)
}
// Success: order-123 now at version 6

Example 2: Conflict (Another Writer Won)

// Scenario: Two goroutines writing to same actor concurrently

// Thread A & B both do:
version, _ := store.GetLatestVersion("order-123")  // Both get 5
event1.Version = 6
event2.Version = 6

// Thread A writes
err := store.SaveEvent(event1)  // Success! order-123 now version 6

// Thread B writes
err := store.SaveEvent(event2)  // FAIL: VersionConflictError
if errors.Is(err, aether.ErrVersionConflict) {
    var vErr *aether.VersionConflictError
    errors.As(err, &vErr)
    log.Printf("conflict: attempted %d, current is %d",
        vErr.AttemptedVersion, vErr.CurrentVersion)

    // Retry
    newVersion := vErr.CurrentVersion + 1
    event2.Version = newVersion
    err = store.SaveEvent(event2)  // Try again with version 7
}

Example 3: First Event (New Actor)

// New actor "order-999" (no prior events)
version, _ := store.GetLatestVersion("order-999")  // Returns 0

// Must use version > 0
event := &aether.Event{
    ID:      "evt-1",
    ActorID: "order-999",
    Version: 1,  // > 0, good
    ...
}
err := store.SaveEvent(event)  // Success!

// If we tried version 0 or negative:
event.Version = 0
err := store.SaveEvent(event)  // FAIL: VersionConflictError (0 <= 0)

Example 4: Version Gaps

// Actor with versions 1, 3, 5 (gaps are OK)
store.SaveEvent(&Event{ActorID: "actor-1", Version: 1, ...})
store.SaveEvent(&Event{ActorID: "actor-1", Version: 3, ...})  // Gap of 2, OK
store.SaveEvent(&Event{ActorID: "actor-1", Version: 5, ...})  // Gap of 2, OK

// Gaps don't cause conflicts
version, _ := store.GetLatestVersion("actor-1")  // Returns 5

Glossary

Term Definition
Actor An entity with independent event stream and version sequence (e.g., order, user, subscription)
Version Monotonically increasing integer per-actor, uniquely identifying event in sequence
Monotonic Strictly increasing (each value > previous)
Conflict Write attempt with version <= current version (rejected)
Optimistic Concurrency No locks; detect conflicts at write time; let readers proceed
Invariant Business rule that must always be true (cannot be violated by code)
Aggregate Cluster of entities enforcing invariants; transactional boundary
Event Store Persistence layer for events; enforces invariants
Retry Attempt write again after conflict (application decides strategy)
Backoff Wait time before retry (typically exponential)
GetLatestVersion Read-only query returning current version or 0
SaveEvent Write command; returns error if version conflict

References

Code Files:

  • Event definition: /Users/hugo.nijhuis/src/github/flowmade-one/aether/event.go (lines 9-29, 177-207)
  • InMemoryEventStore: /Users/hugo.nijhuis/src/github/flowmade-one/aether/store/memory.go (lines 27-55)
  • JetStreamEventStore: /Users/hugo.nijhuis/src/github/flowmade-one/aether/store/jetstream.go (lines 122-163, 280-298)
  • Tests: /Users/hugo.nijhuis/src/github/flowmade-one/aether/store/jetstream_integration_test.go

Documentation:

  • CLAUDE.md: Optimistic Concurrency Pattern (with code example)
  • Vision: /Users/hugo.nijhuis/src/github/flowmade-one/aether/vision.md

Key Code Snippets:

  • VersionConflictError: event.go lines 14-29
  • SaveEvent implementation: store/jetstream.go lines 119-163
  • GetLatestVersion implementation: store/jetstream.go lines 280-298