diff --git a/event_test.go b/event_test.go index 6590daf..a2f5911 100644 --- a/event_test.go +++ b/event_test.go @@ -2,6 +2,8 @@ package aether import ( "encoding/json" + "errors" + "fmt" "strings" "testing" "time" @@ -1335,3 +1337,190 @@ func TestReplayError_WithLargeRawData(t *testing.T) { // Error() should still work _ = err.Error() } + +// Tests for VersionConflictError + +func TestVersionConflictError_Error(t *testing.T) { + err := &VersionConflictError{ + ActorID: "order-123", + AttemptedVersion: 3, + CurrentVersion: 5, + } + + errMsg := err.Error() + + // Verify error message contains all context + if !strings.Contains(errMsg, "order-123") { + t.Errorf("error message should contain ActorID, got: %s", errMsg) + } + if !strings.Contains(errMsg, "3") { + t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg) + } + if !strings.Contains(errMsg, "5") { + t.Errorf("error message should contain CurrentVersion, got: %s", errMsg) + } + if !strings.Contains(errMsg, "version conflict") { + t.Errorf("error message should contain 'version conflict', got: %s", errMsg) + } +} + +func TestVersionConflictError_Fields(t *testing.T) { + err := &VersionConflictError{ + ActorID: "actor-456", + AttemptedVersion: 10, + CurrentVersion: 8, + } + + if err.ActorID != "actor-456" { + t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456") + } + if err.AttemptedVersion != 10 { + t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10) + } + if err.CurrentVersion != 8 { + t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8) + } +} + +func TestVersionConflictError_Unwrap(t *testing.T) { + err := &VersionConflictError{ + ActorID: "actor-789", + AttemptedVersion: 2, + CurrentVersion: 1, + } + + unwrapped := err.Unwrap() + if unwrapped != ErrVersionConflict { + t.Errorf("Unwrap should return ErrVersionConflict sentinel") + } +} + +func TestVersionConflictError_ErrorsIs(t *testing.T) { + err := &VersionConflictError{ + ActorID: "test-actor", + AttemptedVersion: 5, + CurrentVersion: 4, + } + + // Test that errors.Is works with sentinel + if !errors.Is(err, ErrVersionConflict) { + t.Error("errors.Is(err, ErrVersionConflict) should return true") + } + + // Test that other errors don't match + if errors.Is(err, errors.New("other error")) { + t.Error("errors.Is should not match unrelated errors") + } +} + +func TestVersionConflictError_ErrorsAs(t *testing.T) { + originalErr := &VersionConflictError{ + ActorID: "actor-unwrap", + AttemptedVersion: 7, + CurrentVersion: 6, + } + + var versionErr *VersionConflictError + if !errors.As(originalErr, &versionErr) { + t.Fatalf("errors.As should succeed with VersionConflictError") + } + + // Verify fields are accessible through unwrapped error + if versionErr.ActorID != "actor-unwrap" { + t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID) + } + if versionErr.AttemptedVersion != 7 { + t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion) + } + if versionErr.CurrentVersion != 6 { + t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion) + } +} + +func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) { + // This test verifies that applications can read CurrentVersion for retry strategies + err := &VersionConflictError{ + ActorID: "order-abc", + AttemptedVersion: 2, + CurrentVersion: 10, + } + + var versionErr *VersionConflictError + if !errors.As(err, &versionErr) { + t.Fatal("failed to unwrap VersionConflictError") + } + + // Application can use CurrentVersion to decide retry strategy + nextVersion := versionErr.CurrentVersion + 1 + + if nextVersion != 11 { + t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion) + } + + // Application can log detailed context + logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d", + versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion) + + if !strings.Contains(logMsg, "order-abc") { + t.Errorf("application context logging failed: %s", logMsg) + } +} + +func TestVersionConflictError_EdgeCases(t *testing.T) { + testCases := []struct { + name string + actorID string + attemp int64 + current int64 + }{ + {"zero current", "actor-1", 1, 0}, + {"large numbers", "actor-2", 1000000, 999999}, + {"max int64", "actor-3", 9223372036854775807, 9223372036854775806}, + {"negative attempt", "actor-4", -1, -2}, + {"empty actor id", "", 1, 0}, + {"special chars in actor id", "actor@#$%", 2, 1}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := &VersionConflictError{ + ActorID: tc.actorID, + AttemptedVersion: tc.attemp, + CurrentVersion: tc.current, + } + + // Should not panic + msg := err.Error() + if msg == "" { + t.Error("Error() should return non-empty string") + } + + // Should be wrapped correctly + if err.Unwrap() != ErrVersionConflict { + t.Error("Unwrap should return ErrVersionConflict") + } + + // errors.Is should work + if !errors.Is(err, ErrVersionConflict) { + t.Error("errors.Is should work for edge case") + } + }) + } +} + +func TestErrVersionConflict_Sentinel(t *testing.T) { + // Verify the sentinel error is correctly defined + if ErrVersionConflict == nil { + t.Fatal("ErrVersionConflict should not be nil") + } + + expectedMsg := "version conflict" + if ErrVersionConflict.Error() != expectedMsg { + t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg) + } + + // Test that it's usable with errors.Is + if !errors.Is(ErrVersionConflict, ErrVersionConflict) { + t.Error("ErrVersionConflict should match itself with errors.Is") + } +} diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..aaec482 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,189 @@ +# Aether Examples + +This directory contains examples demonstrating common patterns for using Aether. + +## Retry Patterns (`retry_patterns.go`) + +When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies. + +### Pattern Overview + +All retry patterns work with `VersionConflictError` which provides three critical fields: + +- **ActorID**: The actor that experienced the conflict +- **CurrentVersion**: The latest version in the store +- **AttemptedVersion**: The version you tried to save + +Your application can read these fields to make intelligent retry decisions. + +### Available Patterns + +#### SimpleRetryPattern + +The most basic pattern - just retry with exponential backoff: + +```go +// Automatically retries up to 3 times with exponential backoff +err := SimpleRetryPattern(store, "order-123", "OrderUpdated") +``` + +**Use when**: You want a straightforward retry mechanism without complex logic. + +#### ConflictDetailedRetryPattern + +Extracts detailed information from the conflict error to make smarter decisions: + +```go +// Detects thrashing (multiple conflicts at same version) +// and can implement circuit-breaker logic +err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated") +``` + +**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing. + +#### JitterRetryPattern + +Adds randomized jitter to prevent "thundering herd" when multiple writers retry: + +```go +// Exponential backoff with jitter prevents synchronized retries +err := JitterRetryPattern(store, "order-123", "OrderUpdated") +``` + +**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time. + +#### AdaptiveRetryPattern + +Adjusts backoff duration based on version distance (indicator of contention): + +```go +// Light contention (gap=1): 50ms backoff +// Moderate contention (gap=3-10): proportional backoff +// High contention (gap>10): aggressive backoff +err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated") +``` + +**Use when**: You want backoff strategy to respond to actual system load. + +#### EventualConsistencyPattern + +Instead of blocking on retry, queues the event for asynchronous retry: + +```go +// Returns immediately, event is queued for background retry +EventualConsistencyPattern(store, retryQueue, event) + +// Background worker processes the queue +for item := range retryQueue { + // Implement your own retry logic here +} +``` + +**Use when**: You can't afford to block the request, and background retry is acceptable. + +#### CircuitBreakerPattern + +Implements a circuit breaker to prevent cascading failures: + +```go +cb := NewCircuitBreaker() + +// Fails fast when circuit is open +err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated") +if err != nil && !cb.CanRetry() { + return ErrCircuitBreakerOpen +} +``` + +**Use when**: You have a distributed system and want to prevent retry storms during outages. + +## Common Pattern: Extract and Log Context + +All patterns can read context from `VersionConflictError`: + +```go +var versionErr *aether.VersionConflictError +if errors.As(err, &versionErr) { + log.Printf( + "Conflict for actor %q: attempted %d, current %d", + versionErr.ActorID, + versionErr.AttemptedVersion, + versionErr.CurrentVersion, + ) +} +``` + +## Sentinel Error Check + +Check if an error is a version conflict without examining the struct: + +```go +if errors.Is(err, aether.ErrVersionConflict) { + // This is a version conflict - retry is appropriate +} +``` + +## Implementing Your Own Pattern + +Basic template: + +```go +for attempt := 0; attempt < maxRetries; attempt++ { + // 1. Get current version + currentVersion, err := store.GetLatestVersion(actorID) + if err != nil { + return err + } + + // 2. Create event with next version + event := &aether.Event{ + ActorID: actorID, + Version: currentVersion + 1, + // ... other fields + } + + // 3. Attempt save + err = store.SaveEvent(event) + if err == nil { + return nil // Success + } + + // 4. Check if it's a conflict + if !errors.Is(err, aether.ErrVersionConflict) { + return err // Some other error + } + + // 5. Implement your retry strategy + time.Sleep(yourBackoff(attempt)) +} +``` + +## Choosing a Pattern + +| Pattern | Latency | Throughput | Complexity | Use Case | +|---------|---------|-----------|-----------|----------| +| Simple | Low | Low | Very Low | Single writer, testing | +| DetailedConflict | Low | Medium | Medium | Debugging, monitoring | +| Jitter | Low-Medium | High | Low | Multi-writer concurrency | +| Adaptive | Low-Medium | High | Medium | Variable load scenarios | +| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads | +| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems | + +## Performance Considerations + +1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency +2. **Retry limits**: Too few retries give up too early, too many waste resources +3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios +4. **Monitoring**: Track retry rates and conflict patterns to detect system issues + +## Testing + +Use `aether.NewInMemoryEventStore()` in tests: + +```go +store := store.NewInMemoryEventStore() +err := SimpleRetryPattern(store, "test-actor", "TestEvent") +if err != nil { + t.Fatalf("retry pattern failed: %v", err) +} +``` diff --git a/examples/retry_patterns.go b/examples/retry_patterns.go new file mode 100644 index 0000000..a803a77 --- /dev/null +++ b/examples/retry_patterns.go @@ -0,0 +1,353 @@ +package examples + +import ( + "errors" + "fmt" + "log" + "math" + "math/rand" + "time" + + "git.flowmade.one/flowmade-one/aether" +) + +// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError. +// +// This pattern is suitable for scenarios where you want to automatically retry +// with exponential backoff when version conflicts occur. +func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error { + const maxRetries = 3 + const initialBackoff = 100 * time.Millisecond + + var event *aether.Event + + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff + log.Printf("Retry attempt %d after %v", attempt, backoff) + time.Sleep(backoff) + } + + // Get the current version for the actor + currentVersion, err := store.GetLatestVersion(actorID) + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + + // Create event with next version + event = &aether.Event{ + ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt), + EventType: eventType, + ActorID: actorID, + Version: currentVersion + 1, + Data: map[string]interface{}{"attempt": attempt}, + Timestamp: time.Now(), + } + + // Attempt to save + if err := store.SaveEvent(event); err == nil { + log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version) + return nil + } else if !errors.Is(err, aether.ErrVersionConflict) { + // Some other error occurred + return fmt.Errorf("save event failed: %w", err) + } + // If it's a version conflict, loop will retry + } + + return fmt.Errorf("failed to save event after %d retries", maxRetries) +} + +// ConflictDetailedRetryPattern demonstrates how to extract detailed information +// from VersionConflictError to make intelligent retry decisions. +// +// This pattern shows how to log detailed context and potentially implement +// circuit-breaker logic based on the conflict information. +func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error { + const maxRetries = 5 + var lastConflictVersion int64 + + for attempt := 0; attempt < maxRetries; attempt++ { + // Get current version + currentVersion, err := store.GetLatestVersion(actorID) + if err != nil { + return err + } + + // Create event + event := &aether.Event{ + ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), + EventType: eventType, + ActorID: actorID, + Version: currentVersion + 1, + Data: map[string]interface{}{"timestamp": time.Now()}, + Timestamp: time.Now(), + } + + // Attempt to save + err = store.SaveEvent(event) + if err == nil { + return nil // Success + } + + // Check if it's a version conflict + var versionErr *aether.VersionConflictError + if !errors.As(err, &versionErr) { + // Not a version conflict, fail immediately + return err + } + + // Extract detailed context from the conflict error + log.Printf( + "Version conflict for actor %q: attempted version %d, current version %d", + versionErr.ActorID, + versionErr.AttemptedVersion, + versionErr.CurrentVersion, + ) + + // Check for thrashing (multiple conflicts with same version) + if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 { + log.Printf("Detected version thrashing - circuit breaker would trigger here") + return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion) + } + lastConflictVersion = versionErr.CurrentVersion + + // Exponential backoff + backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond + time.Sleep(backoff) + } + + return fmt.Errorf("failed after %d retries", maxRetries) +} + +// JitterRetryPattern implements exponential backoff with jitter to prevent +// thundering herd when multiple writers retry simultaneously. +func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error { + const maxRetries = 3 + const baseBackoff = 100 * time.Millisecond + const maxJitter = 0.1 // 10% jitter + + for attempt := 0; attempt < maxRetries; attempt++ { + currentVersion, err := store.GetLatestVersion(actorID) + if err != nil { + return err + } + + event := &aether.Event{ + ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), + EventType: eventType, + ActorID: actorID, + Version: currentVersion + 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + err = store.SaveEvent(event) + if err == nil { + return nil + } + + if !errors.Is(err, aether.ErrVersionConflict) { + return err + } + + // Exponential backoff with jitter + exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff + jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter) + totalBackoff := exponentialBackoff + jitter + + log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries) + time.Sleep(totalBackoff) + } + + return fmt.Errorf("failed after %d retries", maxRetries) +} + +// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns. +// +// This pattern demonstrates how application logic can use CurrentVersion to +// decide whether to retry, give up, or escalate to a higher-level handler. +func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error { + const maxRetries = 3 + + for attempt := 0; attempt < maxRetries; attempt++ { + currentVersion, err := store.GetLatestVersion(actorID) + if err != nil { + return err + } + + event := &aether.Event{ + ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), + EventType: eventType, + ActorID: actorID, + Version: currentVersion + 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + err = store.SaveEvent(event) + if err == nil { + return nil + } + + var versionErr *aether.VersionConflictError + if !errors.As(err, &versionErr) { + return err + } + + // Adaptive backoff based on version distance + versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion + if versionDistance > 10 { + // Many concurrent writers - back off more aggressively + log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance) + time.Sleep(time.Duration(versionDistance*10) * time.Millisecond) + } else if versionDistance > 3 { + // Moderate contention - normal backoff + log.Printf("Moderate contention detected (gap: %d)", versionDistance) + time.Sleep(time.Duration(versionDistance) * time.Millisecond) + } else { + // Light contention - minimal backoff + log.Printf("Light contention detected") + time.Sleep(50 * time.Millisecond) + } + } + + return fmt.Errorf("failed after %d retries", maxRetries) +} + +// EventualConsistencyPattern demonstrates how to handle version conflicts +// in an eventually consistent manner by publishing to a retry queue. +// +// This is useful when immediate retry is not feasible, and you want to +// defer the operation to a background worker. +type RetryQueueItem struct { + Event *aether.Event + ConflictVersion int64 + ConflictAttempted int64 + NextRetryTime time.Time + FailureCount int +} + +func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) { + err := store.SaveEvent(event) + if err == nil { + return + } + + var versionErr *aether.VersionConflictError + if !errors.As(err, &versionErr) { + log.Printf("Non-retryable error: %v", err) + return + } + + // Queue for retry - background worker will process this + retryItem := RetryQueueItem{ + Event: event, + ConflictVersion: versionErr.CurrentVersion, + ConflictAttempted: versionErr.AttemptedVersion, + NextRetryTime: time.Now().Add(1 * time.Second), + FailureCount: 0, + } + + select { + case retryQueue <- retryItem: + log.Printf("Queued event for retry: actor=%s", event.ActorID) + case <-time.After(5 * time.Second): + log.Printf("Failed to queue event for retry (queue full)") + } +} + +// CircuitBreakerPattern implements a simple circuit breaker for version conflicts. +// +// The circuit breaker tracks failure rates and temporarily stops retrying +// when the failure rate gets too high, allowing the system to recover. +type CircuitBreaker struct { + failureCount int + successCount int + state string // "closed", "open", "half-open" + lastFailureTime time.Time + openDuration time.Duration + failureThreshold int + successThreshold int +} + +func NewCircuitBreaker() *CircuitBreaker { + return &CircuitBreaker{ + state: "closed", + openDuration: 30 * time.Second, + failureThreshold: 5, + successThreshold: 3, + } +} + +func (cb *CircuitBreaker) RecordSuccess() { + if cb.state == "half-open" { + cb.successCount++ + if cb.successCount >= cb.successThreshold { + cb.state = "closed" + cb.failureCount = 0 + cb.successCount = 0 + log.Printf("Circuit breaker closed") + } + } +} + +func (cb *CircuitBreaker) RecordFailure() { + cb.lastFailureTime = time.Now() + cb.failureCount++ + if cb.failureCount >= cb.failureThreshold { + cb.state = "open" + log.Printf("Circuit breaker opened") + } +} + +func (cb *CircuitBreaker) CanRetry() bool { + if cb.state == "closed" { + return true + } + if cb.state == "open" { + if time.Since(cb.lastFailureTime) > cb.openDuration { + cb.state = "half-open" + cb.failureCount = 0 + cb.successCount = 0 + log.Printf("Circuit breaker half-open") + return true + } + return false + } + // half-open state allows retries + return true +} + +func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error { + if !cb.CanRetry() { + return fmt.Errorf("circuit breaker open - not retrying") + } + + currentVersion, err := store.GetLatestVersion(actorID) + if err != nil { + return err + } + + event := &aether.Event{ + ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), + EventType: eventType, + ActorID: actorID, + Version: currentVersion + 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + err = store.SaveEvent(event) + if err == nil { + cb.RecordSuccess() + return nil + } + + if !errors.Is(err, aether.ErrVersionConflict) { + return err + } + + cb.RecordFailure() + return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state) +}