package main import ( "errors" "fmt" "log" "math" "math/rand" "time" "git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether/store" "github.com/google/uuid" ) // Example 1: Simple Retry with Exponential Backoff // This is the most common pattern for handling version conflicts. func simpleRetryWithExponentialBackoff(eventStore aether.EventStore, actorID string) error { const maxRetries = 5 const baseDelay = 10 * time.Millisecond for attempt := 0; attempt < maxRetries; attempt++ { // Get current version currentVersion, err := eventStore.GetLatestVersion(actorID) if err != nil { return fmt.Errorf("failed to get latest version: %w", err) } // Create event with next version event := &aether.Event{ ID: uuid.New().String(), EventType: "OrderUpdated", ActorID: actorID, Version: currentVersion + 1, Data: map[string]interface{}{"status": "processing"}, Timestamp: time.Now(), } // Attempt to save err = eventStore.SaveEvent(event) if err == nil { // Success! return nil } // Check if it's a version conflict if !errors.Is(err, aether.ErrVersionConflict) { // Different error - don't retry return err } // Version conflict - extract details for logging var versionErr *aether.VersionConflictError if errors.As(err, &versionErr) { log.Printf("Attempt %d: Version conflict for actor %q: current=%d, attempted=%d", attempt+1, versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion) } // Last attempt - return error if attempt == maxRetries-1 { return fmt.Errorf("failed to save event after %d attempts: %w", maxRetries, err) } // Calculate delay with exponential backoff: baseDelay * 2^attempt delay := time.Duration(baseDelay.Milliseconds()*int64(math.Pow(2, float64(attempt)))) * time.Millisecond log.Printf("Retrying in %v...", delay) time.Sleep(delay) } return nil } // Example 2: Retry with Circuit Breaker Pattern // Use a circuit breaker to avoid hammering the store during cascading failures. type CircuitBreaker struct { failureThreshold int cooldownTime time.Duration failures int lastFailureTime time.Time state string // "closed", "open", "half-open" } func (cb *CircuitBreaker) canAttempt() bool { switch cb.state { case "closed": return true case "open": // Check if we should transition to half-open if time.Since(cb.lastFailureTime) > cb.cooldownTime { cb.state = "half-open" return true } return false case "half-open": return true } return false } func (cb *CircuitBreaker) recordSuccess() { cb.failures = 0 cb.state = "closed" } func (cb *CircuitBreaker) recordFailure() { cb.failures++ cb.lastFailureTime = time.Now() if cb.failures >= cb.failureThreshold { cb.state = "open" } } func retryWithCircuitBreaker(eventStore aether.EventStore, actorID string) error { cb := &CircuitBreaker{ failureThreshold: 5, cooldownTime: 1 * time.Second, state: "closed", } const maxAttempts = 10 for attempt := 0; attempt < maxAttempts; attempt++ { if !cb.canAttempt() { return fmt.Errorf("circuit breaker is open for actor %q", actorID) } // Get current version currentVersion, err := eventStore.GetLatestVersion(actorID) if err != nil { return fmt.Errorf("failed to get latest version: %w", err) } // Create event with next version event := &aether.Event{ ID: uuid.New().String(), EventType: "OrderUpdated", ActorID: actorID, Version: currentVersion + 1, Data: map[string]interface{}{"status": "processing"}, Timestamp: time.Now(), } // Attempt to save err = eventStore.SaveEvent(event) if err == nil { cb.recordSuccess() return nil } // Check if it's a version conflict if !errors.Is(err, aether.ErrVersionConflict) { // Different error - don't retry return err } cb.recordFailure() // Version conflict - extract details var versionErr *aether.VersionConflictError if errors.As(err, &versionErr) { log.Printf("Circuit breaker (state=%s): Version conflict for actor %q", cb.state, versionErr.ActorID) } if attempt < maxAttempts-1 { time.Sleep(10 * time.Millisecond) } } return fmt.Errorf("max retry attempts exceeded for actor %q", actorID) } // Example 3: Deterministic Retry - Always Succeed by Reloading Latest State // Some applications can afford to reload state and merge changes. type OrderAggregate struct { ID string Version int64 Status string Amount float64 Comments []string } func retryWithStateReload(eventStore aether.EventStore, orderID string, updateFn func(*OrderAggregate) error) error { const maxRetries = 3 for attempt := 0; attempt < maxRetries; attempt++ { // Load current state by replaying events events, err := eventStore.GetEvents(orderID, 0) if err != nil { return fmt.Errorf("failed to load events: %w", err) } // Rebuild aggregate from events aggregate := &OrderAggregate{ID: orderID} for _, e := range events { aggregate.Version = e.Version // Apply event to aggregate (simplified) } // Apply update if err := updateFn(aggregate); err != nil { return fmt.Errorf("update function failed: %w", err) } // Create event for the update event := &aether.Event{ ID: uuid.New().String(), EventType: "OrderUpdated", ActorID: orderID, Version: aggregate.Version + 1, Data: map[string]interface{}{"status": aggregate.Status}, Timestamp: time.Now(), } // Attempt to save err = eventStore.SaveEvent(event) if err == nil { // Success! log.Printf("Order %q updated with version %d", orderID, event.Version) return nil } // Check if it's a version conflict if !errors.Is(err, aether.ErrVersionConflict) { // Different error - don't retry return err } var versionErr *aether.VersionConflictError if errors.As(err, &versionErr) { log.Printf("Version conflict on attempt %d: current=%d, attempted=%d. Retrying...", attempt+1, versionErr.CurrentVersion, versionErr.AttemptedVersion) } if attempt < maxRetries-1 { // Brief delay before retry time.Sleep(time.Duration(50*(attempt+1)) * time.Millisecond) } } return fmt.Errorf("failed to update order %q after %d retries", orderID, maxRetries) } // Example 4: Jittered Retry // Add randomness to prevent thundering herd problem. func retryWithJitteredBackoff(eventStore aether.EventStore, actorID string) error { const maxRetries = 5 const baseDelay = 10 * time.Millisecond for attempt := 0; attempt < maxRetries; attempt++ { // Get current version currentVersion, err := eventStore.GetLatestVersion(actorID) if err != nil { return fmt.Errorf("failed to get latest version: %w", err) } // Create event with next version event := &aether.Event{ ID: uuid.New().String(), EventType: "OrderUpdated", ActorID: actorID, Version: currentVersion + 1, Data: map[string]interface{}{"status": "processing"}, Timestamp: time.Now(), } // Attempt to save err = eventStore.SaveEvent(event) if err == nil { return nil } // Check if it's a version conflict if !errors.Is(err, aether.ErrVersionConflict) { return err } if attempt == maxRetries-1 { return err } // Calculate delay with exponential backoff + jitter exponentialDelay := time.Duration(baseDelay.Milliseconds()*int64(math.Pow(2, float64(attempt)))) * time.Millisecond jitter := time.Duration(rand.Int63n(int64(exponentialDelay))) // Random jitter up to exponential delay delay := exponentialDelay + jitter log.Printf("Retrying in %v with jitter...", delay) time.Sleep(delay) } return nil } // Example 5: Analyzing Version Conflict Context // Extract all relevant information from a version conflict for debugging/monitoring. func analyzeVersionConflict(err error) { if !errors.Is(err, aether.ErrVersionConflict) { log.Println("Not a version conflict error") return } var versionErr *aether.VersionConflictError if errors.As(err, &versionErr) { log.Printf("Version Conflict Details:") log.Printf(" Actor ID: %s", versionErr.ActorID) log.Printf(" Current Version: %d", versionErr.CurrentVersion) log.Printf(" Attempted Version: %d", versionErr.AttemptedVersion) log.Printf(" Version Gap: %d", versionErr.AttemptedVersion-versionErr.CurrentVersion) log.Printf(" Error Message: %s", versionErr.Error()) // Application-specific logic if versionErr.CurrentVersion > versionErr.AttemptedVersion { log.Printf("WARNING: Attempted version %d is behind current version %d", versionErr.AttemptedVersion, versionErr.CurrentVersion) } } } // Example usage showing all patterns func main() { // Create in-memory event store for demonstration eventStore := store.NewInMemoryEventStore() actorID := "order-123" fmt.Println("=== Version Conflict Retry Patterns ===\n") // Demonstrate patterns (using simplified versions) log.Println("Pattern 1: Simple Exponential Backoff") if err := simpleRetryWithExponentialBackoff(eventStore, actorID); err != nil { log.Printf("Error: %v\n", err) } log.Println("\nPattern 3: State Reload") if err := retryWithStateReload(eventStore, actorID, func(agg *OrderAggregate) error { agg.Status = "shipped" return nil }); err != nil { log.Printf("Error: %v\n", err) } log.Println("\nPattern 4: Jittered Backoff") if err := retryWithJitteredBackoff(eventStore, actorID); err != nil { log.Printf("Error: %v\n", err) } fmt.Println("\n=== Recommended Pattern ===") fmt.Println("Use exponential backoff (Pattern 1) for most cases:") fmt.Println("- Simple to understand and implement") fmt.Println("- Respects the store's capacity") fmt.Println("- Avoids thundering herd (add jitter for high concurrency)") fmt.Println("") fmt.Println("Use state reload (Pattern 3) when:") fmt.Println("- You can merge concurrent changes") fmt.Println("- Deterministic success is required") fmt.Println("- Replaying events is not expensive") }