diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..44a7079 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,235 @@ +# Aether Examples + +Standard patterns and best practices for building with Aether. + +## Version Conflict Retry Patterns + +When using optimistic concurrency control with Aether's event store, version conflicts can occur when multiple writers attempt to save events for the same actor. The `VersionConflictError` provides full context about the conflict, enabling intelligent retry strategies. + +### Understanding Version Conflicts + +A version conflict occurs when: +- You attempt to save an event with version `N` +- But the actor already has a version >= `N` + +Example: +```go +// Actor "order-123" currently has version 5 +// Writer A reads version 5, creates version 6, saves successfully +// Writer B also read version 5, creates version 6, attempts save +// -> VersionConflictError: current=6, attempted=6 +``` + +### Working with VersionConflictError + +The `VersionConflictError` provides: +- `ActorID` - The actor that had the conflict +- `CurrentVersion` - The actual current version in the store +- `AttemptedVersion` - The version you tried to save + +Example usage: +```go +err := eventStore.SaveEvent(event) +if errors.Is(err, aether.ErrVersionConflict) { + var versionErr *aether.VersionConflictError + if errors.As(err, &versionErr) { + fmt.Printf("Conflict for actor %q: current=%d, attempted=%d", + versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion) + // Implement retry logic using CurrentVersion + nextVersion := versionErr.CurrentVersion + 1 + } +} +``` + +### Recommended Patterns + +#### Pattern 1: Simple Exponential Backoff (Recommended for Most Cases) + +```go +const maxRetries = 5 +const baseDelay = 10 * time.Millisecond + +for attempt := 0; attempt < maxRetries; attempt++ { + currentVersion, _ := eventStore.GetLatestVersion(actorID) + + event := &aether.Event{ + ActorID: actorID, + Version: currentVersion + 1, + // ... + } + + err := eventStore.SaveEvent(event) + if err == nil { + return nil // Success! + } + + if !errors.Is(err, aether.ErrVersionConflict) { + return err // Different error, don't retry + } + + // Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms + delay := time.Duration(baseDelay.Milliseconds() * int64(math.Pow(2, float64(attempt)))) * time.Millisecond + time.Sleep(delay) +} +return fmt.Errorf("max retries exceeded") +``` + +**Pros:** +- Simple to understand and implement +- Respects store capacity +- Good for most scenarios + +**Cons:** +- Can cause thundering herd in high-concurrency scenarios +- May not work well if conflicts are due to logical issues + +#### Pattern 2: State Reload and Merge + +Use this pattern when you can merge concurrent changes: + +```go +const maxRetries = 3 + +for attempt := 0; attempt < maxRetries; attempt++ { + // Reload current state + events, _ := eventStore.GetEvents(actorID, 0) + aggregate := rebuildFromEvents(events) + + // Apply your update + aggregate.Status = "shipped" + + // Attempt save with new version + event := &aether.Event{ + ActorID: actorID, + Version: aggregate.Version + 1, + Data: map[string]interface{}{"status": aggregate.Status}, + } + + err := eventStore.SaveEvent(event) + if err == nil { + return nil // Success! + } + + if !errors.Is(err, aether.ErrVersionConflict) { + return err + } + + // Reload and retry (loop continues) +} +``` + +**Pros:** +- Deterministic - will eventually succeed +- Can merge concurrent updates +- Good for business logic that's idempotent + +**Cons:** +- More expensive (replaying events each attempt) +- Only works if updates can be safely retried + +#### Pattern 3: Circuit Breaker for Cascading Failures + +Use when you want to avoid hammering a saturated store: + +```go +type CircuitBreaker struct { + state string // "closed", "open", "half-open" + failures int + failureThreshold int + lastFailureTime time.Time + cooldownTime time.Duration +} + +// ... implement circuit breaker logic ... + +// Usage: +if !cb.canAttempt() { + return fmt.Errorf("circuit breaker open") +} + +err := eventStore.SaveEvent(event) +if err == nil { + cb.recordSuccess() +} else if errors.Is(err, aether.ErrVersionConflict) { + cb.recordFailure() + if cb.failureCount >= cb.failureThreshold { + cb.open() + } +} +``` + +**Pros:** +- Prevents cascading failures +- Allows store recovery time +- Good for distributed systems + +**Cons:** +- More complex implementation +- May reject valid requests temporarily + +#### Pattern 4: Jittered Backoff for High Concurrency + +Add randomness to prevent thundering herd: + +```go +exponentialDelay := time.Duration(baseDelay.Milliseconds() * int64(math.Pow(2, float64(attempt)))) * time.Millisecond +jitter := time.Duration(rand.Int63n(int64(exponentialDelay))) +delay := exponentialDelay + jitter +time.Sleep(delay) +``` + +**Pros:** +- Prevents synchronized retries +- Good for high-concurrency scenarios + +**Cons:** +- Slightly more complex +- May increase total retry time + +### Complete Example + +See `version_conflict_retry.go` for complete, runnable examples of all patterns. + +### When to Use Each Pattern + +| Pattern | Use When | Avoid When | +|---------|----------|-----------| +| Exponential Backoff | Default choice for most apps | Store is consistently overloaded | +| State Reload | Updates can be safely replayed | Event replay is expensive | +| Circuit Breaker | Store is frequently saturated | You need immediate feedback | +| Jittered Backoff | Many concurrent writers | Single-threaded app | + +### Monitoring Version Conflicts + +Log and monitor version conflicts to understand contention patterns: + +```go +var versionErr *aether.VersionConflictError +if errors.As(err, &versionErr) { + log.WithFields(log.Fields{ + "actor_id": versionErr.ActorID, + "current_version": versionErr.CurrentVersion, + "attempted_version": versionErr.AttemptedVersion, + "version_gap": versionErr.AttemptedVersion - versionErr.CurrentVersion, + }).Warn("Version conflict") + + // Alert if gap is too large (indicates stale read) + if versionErr.AttemptedVersion - versionErr.CurrentVersion > 5 { + metrics.versionConflictLargeGap.Inc() + } +} +``` + +### Best Practices + +1. **Always check the error type** - Not all errors are version conflicts +2. **Use CurrentVersion for retries** - Don't hardcode retry logic +3. **Set reasonable retry limits** - Prevent infinite loops +4. **Monitor contention** - Track version conflicts to identify hotspots +5. **Consider your domain** - Some updates can be safely retried, others cannot +6. **Test concurrent scenarios** - Version conflicts are rare in single-threaded apps + +### References + +- [CLAUDE.md](../CLAUDE.md) - Architecture and event versioning semantics +- [Event Sourcing Patterns](../vision.md) - Domain-driven design approach diff --git a/examples/version_conflict_retry.go b/examples/version_conflict_retry.go new file mode 100644 index 0000000..ddd11a2 --- /dev/null +++ b/examples/version_conflict_retry.go @@ -0,0 +1,356 @@ +package main + +import ( + "errors" + "fmt" + "log" + "math" + "math/rand" + "time" + + "git.flowmade.one/flowmade-one/aether" + "git.flowmade.one/flowmade-one/aether/store" + "github.com/google/uuid" +) + +// Example 1: Simple Retry with Exponential Backoff +// This is the most common pattern for handling version conflicts. +func simpleRetryWithExponentialBackoff(eventStore aether.EventStore, actorID string) error { + const maxRetries = 5 + const baseDelay = 10 * time.Millisecond + + for attempt := 0; attempt < maxRetries; attempt++ { + // Get current version + currentVersion, err := eventStore.GetLatestVersion(actorID) + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + + // Create event with next version + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderUpdated", + ActorID: actorID, + Version: currentVersion + 1, + Data: map[string]interface{}{"status": "processing"}, + Timestamp: time.Now(), + } + + // Attempt to save + err = eventStore.SaveEvent(event) + if err == nil { + // Success! + return nil + } + + // Check if it's a version conflict + if !errors.Is(err, aether.ErrVersionConflict) { + // Different error - don't retry + return err + } + + // Version conflict - extract details for logging + var versionErr *aether.VersionConflictError + if errors.As(err, &versionErr) { + log.Printf("Attempt %d: Version conflict for actor %q: current=%d, attempted=%d", + attempt+1, versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion) + } + + // Last attempt - return error + if attempt == maxRetries-1 { + return fmt.Errorf("failed to save event after %d attempts: %w", maxRetries, err) + } + + // Calculate delay with exponential backoff: baseDelay * 2^attempt + delay := time.Duration(baseDelay.Milliseconds()*int64(math.Pow(2, float64(attempt)))) * time.Millisecond + log.Printf("Retrying in %v...", delay) + time.Sleep(delay) + } + + return nil +} + +// Example 2: Retry with Circuit Breaker Pattern +// Use a circuit breaker to avoid hammering the store during cascading failures. +type CircuitBreaker struct { + failureThreshold int + cooldownTime time.Duration + failures int + lastFailureTime time.Time + state string // "closed", "open", "half-open" +} + +func (cb *CircuitBreaker) canAttempt() bool { + switch cb.state { + case "closed": + return true + case "open": + // Check if we should transition to half-open + if time.Since(cb.lastFailureTime) > cb.cooldownTime { + cb.state = "half-open" + return true + } + return false + case "half-open": + return true + } + return false +} + +func (cb *CircuitBreaker) recordSuccess() { + cb.failures = 0 + cb.state = "closed" +} + +func (cb *CircuitBreaker) recordFailure() { + cb.failures++ + cb.lastFailureTime = time.Now() + if cb.failures >= cb.failureThreshold { + cb.state = "open" + } +} + +func retryWithCircuitBreaker(eventStore aether.EventStore, actorID string) error { + cb := &CircuitBreaker{ + failureThreshold: 5, + cooldownTime: 1 * time.Second, + state: "closed", + } + + const maxAttempts = 10 + + for attempt := 0; attempt < maxAttempts; attempt++ { + if !cb.canAttempt() { + return fmt.Errorf("circuit breaker is open for actor %q", actorID) + } + + // Get current version + currentVersion, err := eventStore.GetLatestVersion(actorID) + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + + // Create event with next version + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderUpdated", + ActorID: actorID, + Version: currentVersion + 1, + Data: map[string]interface{}{"status": "processing"}, + Timestamp: time.Now(), + } + + // Attempt to save + err = eventStore.SaveEvent(event) + if err == nil { + cb.recordSuccess() + return nil + } + + // Check if it's a version conflict + if !errors.Is(err, aether.ErrVersionConflict) { + // Different error - don't retry + return err + } + + cb.recordFailure() + + // Version conflict - extract details + var versionErr *aether.VersionConflictError + if errors.As(err, &versionErr) { + log.Printf("Circuit breaker (state=%s): Version conflict for actor %q", + cb.state, versionErr.ActorID) + } + + if attempt < maxAttempts-1 { + time.Sleep(10 * time.Millisecond) + } + } + + return fmt.Errorf("max retry attempts exceeded for actor %q", actorID) +} + +// Example 3: Deterministic Retry - Always Succeed by Reloading Latest State +// Some applications can afford to reload state and merge changes. +type OrderAggregate struct { + ID string + Version int64 + Status string + Amount float64 + Comments []string +} + +func retryWithStateReload(eventStore aether.EventStore, orderID string, updateFn func(*OrderAggregate) error) error { + const maxRetries = 3 + + for attempt := 0; attempt < maxRetries; attempt++ { + // Load current state by replaying events + events, err := eventStore.GetEvents(orderID, 0) + if err != nil { + return fmt.Errorf("failed to load events: %w", err) + } + + // Rebuild aggregate from events + aggregate := &OrderAggregate{ID: orderID} + for _, e := range events { + aggregate.Version = e.Version + // Apply event to aggregate (simplified) + } + + // Apply update + if err := updateFn(aggregate); err != nil { + return fmt.Errorf("update function failed: %w", err) + } + + // Create event for the update + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderUpdated", + ActorID: orderID, + Version: aggregate.Version + 1, + Data: map[string]interface{}{"status": aggregate.Status}, + Timestamp: time.Now(), + } + + // Attempt to save + err = eventStore.SaveEvent(event) + if err == nil { + // Success! + log.Printf("Order %q updated with version %d", orderID, event.Version) + return nil + } + + // Check if it's a version conflict + if !errors.Is(err, aether.ErrVersionConflict) { + // Different error - don't retry + return err + } + + var versionErr *aether.VersionConflictError + if errors.As(err, &versionErr) { + log.Printf("Version conflict on attempt %d: current=%d, attempted=%d. Retrying...", + attempt+1, versionErr.CurrentVersion, versionErr.AttemptedVersion) + } + + if attempt < maxRetries-1 { + // Brief delay before retry + time.Sleep(time.Duration(50*(attempt+1)) * time.Millisecond) + } + } + + return fmt.Errorf("failed to update order %q after %d retries", orderID, maxRetries) +} + +// Example 4: Jittered Retry +// Add randomness to prevent thundering herd problem. +func retryWithJitteredBackoff(eventStore aether.EventStore, actorID string) error { + const maxRetries = 5 + const baseDelay = 10 * time.Millisecond + + for attempt := 0; attempt < maxRetries; attempt++ { + // Get current version + currentVersion, err := eventStore.GetLatestVersion(actorID) + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + + // Create event with next version + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderUpdated", + ActorID: actorID, + Version: currentVersion + 1, + Data: map[string]interface{}{"status": "processing"}, + Timestamp: time.Now(), + } + + // Attempt to save + err = eventStore.SaveEvent(event) + if err == nil { + return nil + } + + // Check if it's a version conflict + if !errors.Is(err, aether.ErrVersionConflict) { + return err + } + + if attempt == maxRetries-1 { + return err + } + + // Calculate delay with exponential backoff + jitter + exponentialDelay := time.Duration(baseDelay.Milliseconds()*int64(math.Pow(2, float64(attempt)))) * time.Millisecond + jitter := time.Duration(rand.Int63n(int64(exponentialDelay))) // Random jitter up to exponential delay + delay := exponentialDelay + jitter + + log.Printf("Retrying in %v with jitter...", delay) + time.Sleep(delay) + } + + return nil +} + +// Example 5: Analyzing Version Conflict Context +// Extract all relevant information from a version conflict for debugging/monitoring. +func analyzeVersionConflict(err error) { + if !errors.Is(err, aether.ErrVersionConflict) { + log.Println("Not a version conflict error") + return + } + + var versionErr *aether.VersionConflictError + if errors.As(err, &versionErr) { + log.Printf("Version Conflict Details:") + log.Printf(" Actor ID: %s", versionErr.ActorID) + log.Printf(" Current Version: %d", versionErr.CurrentVersion) + log.Printf(" Attempted Version: %d", versionErr.AttemptedVersion) + log.Printf(" Version Gap: %d", versionErr.AttemptedVersion-versionErr.CurrentVersion) + log.Printf(" Error Message: %s", versionErr.Error()) + + // Application-specific logic + if versionErr.CurrentVersion > versionErr.AttemptedVersion { + log.Printf("WARNING: Attempted version %d is behind current version %d", + versionErr.AttemptedVersion, versionErr.CurrentVersion) + } + } +} + +// Example usage showing all patterns +func main() { + // Create in-memory event store for demonstration + eventStore := store.NewInMemoryEventStore() + actorID := "order-123" + + fmt.Println("=== Version Conflict Retry Patterns ===\n") + + // Demonstrate patterns (using simplified versions) + log.Println("Pattern 1: Simple Exponential Backoff") + if err := simpleRetryWithExponentialBackoff(eventStore, actorID); err != nil { + log.Printf("Error: %v\n", err) + } + + log.Println("\nPattern 3: State Reload") + if err := retryWithStateReload(eventStore, actorID, func(agg *OrderAggregate) error { + agg.Status = "shipped" + return nil + }); err != nil { + log.Printf("Error: %v\n", err) + } + + log.Println("\nPattern 4: Jittered Backoff") + if err := retryWithJitteredBackoff(eventStore, actorID); err != nil { + log.Printf("Error: %v\n", err) + } + + fmt.Println("\n=== Recommended Pattern ===") + fmt.Println("Use exponential backoff (Pattern 1) for most cases:") + fmt.Println("- Simple to understand and implement") + fmt.Println("- Respects the store's capacity") + fmt.Println("- Avoids thundering herd (add jitter for high concurrency)") + fmt.Println("") + fmt.Println("Use state reload (Pattern 3) when:") + fmt.Println("- You can merge concurrent changes") + fmt.Println("- Deterministic success is required") + fmt.Println("- Replaying events is not expensive") +}