Files
Hugo Nijhuis b481dae0b6
All checks were successful
CI / build (push) Successful in 22s
feat: implement cross-node event broadcasting with NATSEventBus (#151)
This PR implements cross-node event broadcasting for aether.

Changes:
- UpdateVersionCache method in JetStreamEventStore
- SubscribeToEventStored helper in NATSEventBus
- Integration tests for cross-node scenarios
- Example code demonstrating NATSEventBus + JetStreamEventStore

Tests: All integration tests passing.
Co-authored-by: Claude Code <noreply@anthropic.com>
Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one>
Reviewed-on: #151
2026-05-17 15:29:52 +00:00
..

Aether Examples

This directory contains examples demonstrating common patterns for using Aether.

Retry Patterns (retry_patterns.go)

When saving events with optimistic concurrency control, your application may encounter VersionConflictError when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.

Pattern Overview

All retry patterns work with VersionConflictError which provides three critical fields:

  • ActorID: The actor that experienced the conflict
  • CurrentVersion: The latest version in the store
  • AttemptedVersion: The version you tried to save

Your application can read these fields to make intelligent retry decisions.

Available Patterns

SimpleRetryPattern

The most basic pattern - just retry with exponential backoff:

// Automatically retries up to 3 times with exponential backoff
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")

Use when: You want a straightforward retry mechanism without complex logic.

ConflictDetailedRetryPattern

Extracts detailed information from the conflict error to make smarter decisions:

// Detects thrashing (multiple conflicts at same version)
// and can implement circuit-breaker logic
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")

Use when: You need visibility into conflict patterns and want to detect system issues like thrashing.

JitterRetryPattern

Adds randomized jitter to prevent "thundering herd" when multiple writers retry:

// Exponential backoff with jitter prevents synchronized retries
err := JitterRetryPattern(store, "order-123", "OrderUpdated")

Use when: You have high concurrency and want to prevent all writers from retrying at the same time.

AdaptiveRetryPattern

Adjusts backoff duration based on version distance (indicator of contention):

// Light contention (gap=1): 50ms backoff
// Moderate contention (gap=3-10): proportional backoff
// High contention (gap>10): aggressive backoff
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")

Use when: You want backoff strategy to respond to actual system load.

EventualConsistencyPattern

Instead of blocking on retry, queues the event for asynchronous retry:

// Returns immediately, event is queued for background retry
EventualConsistencyPattern(store, retryQueue, event)

// Background worker processes the queue
for item := range retryQueue {
    // Implement your own retry logic here
}

Use when: You can't afford to block the request, and background retry is acceptable.

CircuitBreakerPattern

Implements a circuit breaker to prevent cascading failures:

cb := NewCircuitBreaker()

// Fails fast when circuit is open
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
if err != nil && !cb.CanRetry() {
    return ErrCircuitBreakerOpen
}

Use when: You have a distributed system and want to prevent retry storms during outages.

Common Pattern: Extract and Log Context

All patterns can read context from VersionConflictError:

var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
    log.Printf(
        "Conflict for actor %q: attempted %d, current %d",
        versionErr.ActorID,
        versionErr.AttemptedVersion,
        versionErr.CurrentVersion,
    )
}

Sentinel Error Check

Check if an error is a version conflict without examining the struct:

if errors.Is(err, aether.ErrVersionConflict) {
    // This is a version conflict - retry is appropriate
}

Implementing Your Own Pattern

Basic template:

for attempt := 0; attempt < maxRetries; attempt++ {
    // 1. Get current version
    currentVersion, err := store.GetLatestVersion(actorID)
    if err != nil {
        return err
    }

    // 2. Create event with next version
    event := &aether.Event{
        ActorID:   actorID,
        Version:   currentVersion + 1,
        // ... other fields
    }

    // 3. Attempt save
    err = store.SaveEvent(event)
    if err == nil {
        return nil // Success
    }

    // 4. Check if it's a conflict
    if !errors.Is(err, aether.ErrVersionConflict) {
        return err // Some other error
    }

    // 5. Implement your retry strategy
    time.Sleep(yourBackoff(attempt))
}

Choosing a Pattern

Pattern Latency Throughput Complexity Use Case
Simple Low Low Very Low Single writer, testing
DetailedConflict Low Medium Medium Debugging, monitoring
Jitter Low-Medium High Low Multi-writer concurrency
Adaptive Low-Medium High Medium Variable load scenarios
EventualConsistency Very Low Very High High High-volume, async-OK workloads
CircuitBreaker Variable Stable High Distributed, failure-resilient systems

Performance Considerations

  1. Backoff timing: Shorter backoffs waste CPU on retries, longer backoffs increase latency
  2. Retry limits: Too few retries give up too early, too many waste resources
  3. Jitter: Essential for preventing synchronized retries in high-concurrency scenarios
  4. Monitoring: Track retry rates and conflict patterns to detect system issues

Testing

Use aether.NewInMemoryEventStore() in tests:

store := store.NewInMemoryEventStore()
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
if err != nil {
    t.Fatalf("retry pattern failed: %v", err)
}