package examples import ( "errors" "fmt" "log" "math" "math/rand" "time" "git.flowmade.one/flowmade-one/aether" ) // SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError. // // This pattern is suitable for scenarios where you want to automatically retry // with exponential backoff when version conflicts occur. func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error { const maxRetries = 3 const initialBackoff = 100 * time.Millisecond var event *aether.Event for attempt := 0; attempt < maxRetries; attempt++ { if attempt > 0 { backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff log.Printf("Retry attempt %d after %v", attempt, backoff) time.Sleep(backoff) } // Get the current version for the actor currentVersion, err := store.GetLatestVersion(actorID) if err != nil { return fmt.Errorf("failed to get latest version: %w", err) } // Create event with next version event = &aether.Event{ ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt), EventType: eventType, ActorID: actorID, Version: currentVersion + 1, Data: map[string]interface{}{"attempt": attempt}, Timestamp: time.Now(), } // Attempt to save if err := store.SaveEvent(event); err == nil { log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version) return nil } else if !errors.Is(err, aether.ErrVersionConflict) { // Some other error occurred return fmt.Errorf("save event failed: %w", err) } // If it's a version conflict, loop will retry } return fmt.Errorf("failed to save event after %d retries", maxRetries) } // ConflictDetailedRetryPattern demonstrates how to extract detailed information // from VersionConflictError to make intelligent retry decisions. // // This pattern shows how to log detailed context and potentially implement // circuit-breaker logic based on the conflict information. func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error { const maxRetries = 5 var lastConflictVersion int64 for attempt := 0; attempt < maxRetries; attempt++ { // Get current version currentVersion, err := store.GetLatestVersion(actorID) if err != nil { return err } // Create event event := &aether.Event{ ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), EventType: eventType, ActorID: actorID, Version: currentVersion + 1, Data: map[string]interface{}{"timestamp": time.Now()}, Timestamp: time.Now(), } // Attempt to save err = store.SaveEvent(event) if err == nil { return nil // Success } // Check if it's a version conflict var versionErr *aether.VersionConflictError if !errors.As(err, &versionErr) { // Not a version conflict, fail immediately return err } // Extract detailed context from the conflict error log.Printf( "Version conflict for actor %q: attempted version %d, current version %d", versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, ) // Check for thrashing (multiple conflicts with same version) if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 { log.Printf("Detected version thrashing - circuit breaker would trigger here") return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion) } lastConflictVersion = versionErr.CurrentVersion // Exponential backoff backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond time.Sleep(backoff) } return fmt.Errorf("failed after %d retries", maxRetries) } // JitterRetryPattern implements exponential backoff with jitter to prevent // thundering herd when multiple writers retry simultaneously. func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error { const maxRetries = 3 const baseBackoff = 100 * time.Millisecond const maxJitter = 0.1 // 10% jitter for attempt := 0; attempt < maxRetries; attempt++ { currentVersion, err := store.GetLatestVersion(actorID) if err != nil { return err } event := &aether.Event{ ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), EventType: eventType, ActorID: actorID, Version: currentVersion + 1, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event) if err == nil { return nil } if !errors.Is(err, aether.ErrVersionConflict) { return err } // Exponential backoff with jitter exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter) totalBackoff := exponentialBackoff + jitter log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries) time.Sleep(totalBackoff) } return fmt.Errorf("failed after %d retries", maxRetries) } // AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns. // // This pattern demonstrates how application logic can use CurrentVersion to // decide whether to retry, give up, or escalate to a higher-level handler. func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error { const maxRetries = 3 for attempt := 0; attempt < maxRetries; attempt++ { currentVersion, err := store.GetLatestVersion(actorID) if err != nil { return err } event := &aether.Event{ ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), EventType: eventType, ActorID: actorID, Version: currentVersion + 1, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event) if err == nil { return nil } var versionErr *aether.VersionConflictError if !errors.As(err, &versionErr) { return err } // Adaptive backoff based on version distance versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion if versionDistance > 10 { // Many concurrent writers - back off more aggressively log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance) time.Sleep(time.Duration(versionDistance*10) * time.Millisecond) } else if versionDistance > 3 { // Moderate contention - normal backoff log.Printf("Moderate contention detected (gap: %d)", versionDistance) time.Sleep(time.Duration(versionDistance) * time.Millisecond) } else { // Light contention - minimal backoff log.Printf("Light contention detected") time.Sleep(50 * time.Millisecond) } } return fmt.Errorf("failed after %d retries", maxRetries) } // EventualConsistencyPattern demonstrates how to handle version conflicts // in an eventually consistent manner by publishing to a retry queue. // // This is useful when immediate retry is not feasible, and you want to // defer the operation to a background worker. type RetryQueueItem struct { Event *aether.Event ConflictVersion int64 ConflictAttempted int64 NextRetryTime time.Time FailureCount int } func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) { err := store.SaveEvent(event) if err == nil { return } var versionErr *aether.VersionConflictError if !errors.As(err, &versionErr) { log.Printf("Non-retryable error: %v", err) return } // Queue for retry - background worker will process this retryItem := RetryQueueItem{ Event: event, ConflictVersion: versionErr.CurrentVersion, ConflictAttempted: versionErr.AttemptedVersion, NextRetryTime: time.Now().Add(1 * time.Second), FailureCount: 0, } select { case retryQueue <- retryItem: log.Printf("Queued event for retry: actor=%s", event.ActorID) case <-time.After(5 * time.Second): log.Printf("Failed to queue event for retry (queue full)") } } // CircuitBreakerPattern implements a simple circuit breaker for version conflicts. // // The circuit breaker tracks failure rates and temporarily stops retrying // when the failure rate gets too high, allowing the system to recover. type CircuitBreaker struct { failureCount int successCount int state string // "closed", "open", "half-open" lastFailureTime time.Time openDuration time.Duration failureThreshold int successThreshold int } func NewCircuitBreaker() *CircuitBreaker { return &CircuitBreaker{ state: "closed", openDuration: 30 * time.Second, failureThreshold: 5, successThreshold: 3, } } func (cb *CircuitBreaker) RecordSuccess() { if cb.state == "half-open" { cb.successCount++ if cb.successCount >= cb.successThreshold { cb.state = "closed" cb.failureCount = 0 cb.successCount = 0 log.Printf("Circuit breaker closed") } } } func (cb *CircuitBreaker) RecordFailure() { cb.lastFailureTime = time.Now() cb.failureCount++ if cb.failureCount >= cb.failureThreshold { cb.state = "open" log.Printf("Circuit breaker opened") } } func (cb *CircuitBreaker) CanRetry() bool { if cb.state == "closed" { return true } if cb.state == "open" { if time.Since(cb.lastFailureTime) > cb.openDuration { cb.state = "half-open" cb.failureCount = 0 cb.successCount = 0 log.Printf("Circuit breaker half-open") return true } return false } // half-open state allows retries return true } func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error { if !cb.CanRetry() { return fmt.Errorf("circuit breaker open - not retrying") } currentVersion, err := store.GetLatestVersion(actorID) if err != nil { return err } event := &aether.Event{ ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), EventType: eventType, ActorID: actorID, Version: currentVersion + 1, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event) if err == nil { cb.RecordSuccess() return nil } if !errors.Is(err, aether.ErrVersionConflict) { return err } cb.RecordFailure() return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state) }