Compare commits

...

2 Commits

Author SHA1 Message Date
Claude Code
d929729d79 fix: address review feedback
- Remove redundant newline from fmt.Println at line 325
- Add 9 comprehensive VersionConflictError tests to event_test.go:
  * Error message formatting with context fields
  * Field accessibility (ActorID, AttemptedVersion, CurrentVersion)
  * Unwrap() method and error wrapping
  * Sentinel error checking with errors.Is()
  * Type assertion support with errors.As()
  * Retry logic extraction of CurrentVersion
  * Special character handling in ActorID

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 22:24:37 +01:00
Claude Code
f16a7c6237 docs: Add VersionConflictError retry pattern examples
Some checks failed
CI / build (pull_request) Failing after 9s
CI / integration (pull_request) Failing after 2m0s
Add comprehensive examples demonstrating standard retry patterns for
handling version conflicts during optimistic concurrency control:

- Pattern 1: Simple exponential backoff (recommended for most cases)
- Pattern 2: State reload and merge (deterministic, idempotent updates)
- Pattern 3: Circuit breaker (cascading failure prevention)
- Pattern 4: Jittered backoff (thundering herd prevention)
- Pattern 5: Conflict analysis and monitoring

Includes complete, runnable examples and a guide to choosing the right
pattern for different scenarios. Documents best practices for monitoring
and debugging version conflicts.

Closes #62

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 21:26:05 +01:00
3 changed files with 751 additions and 0 deletions

View File

@@ -2,6 +2,7 @@ package aether
import ( import (
"encoding/json" "encoding/json"
"errors"
"strings" "strings"
"testing" "testing"
"time" "time"
@@ -1335,3 +1336,162 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
// Error() should still work // Error() should still work
_ = err.Error() _ = err.Error()
} }
// Tests for VersionConflictError
func TestVersionConflictError_ErrorMessage(t *testing.T) {
err := &VersionConflictError{
ActorID: "order-123",
CurrentVersion: 5,
AttemptedVersion: 5,
}
msg := err.Error()
if !strings.Contains(msg, "order-123") {
t.Errorf("expected ActorID in message, got: %s", msg)
}
if !strings.Contains(msg, "5") {
t.Errorf("expected versions in message, got: %s", msg)
}
if !strings.Contains(msg, "version conflict") {
t.Errorf("expected 'version conflict' in message, got: %s", msg)
}
}
func TestVersionConflictError_ActorIDField(t *testing.T) {
err := &VersionConflictError{
ActorID: "test-actor-456",
CurrentVersion: 10,
AttemptedVersion: 11,
}
if err.ActorID != "test-actor-456" {
t.Errorf("ActorID field access failed: got %q, want %q", err.ActorID, "test-actor-456")
}
}
func TestVersionConflictError_AttemptedVersionField(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-123",
CurrentVersion: 5,
AttemptedVersion: 99,
}
if err.AttemptedVersion != 99 {
t.Errorf("AttemptedVersion field access failed: got %d, want %d", err.AttemptedVersion, 99)
}
}
func TestVersionConflictError_CurrentVersionField(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-123",
CurrentVersion: 42,
AttemptedVersion: 43,
}
if err.CurrentVersion != 42 {
t.Errorf("CurrentVersion field access failed: got %d, want %d", err.CurrentVersion, 42)
}
}
func TestVersionConflictError_Unwrap(t *testing.T) {
err := &VersionConflictError{
ActorID: "order-456",
CurrentVersion: 3,
AttemptedVersion: 3,
}
unwrapped := err.Unwrap()
if !errors.Is(unwrapped, ErrVersionConflict) {
t.Errorf("expected Unwrap to return ErrVersionConflict sentinel")
}
}
func TestVersionConflictError_ErrorsIs(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-789",
CurrentVersion: 7,
AttemptedVersion: 8,
}
if !errors.Is(err, ErrVersionConflict) {
t.Error("expected errors.Is to recognize VersionConflictError as ErrVersionConflict")
}
}
func TestVersionConflictError_ErrorsAs(t *testing.T) {
originalErr := &VersionConflictError{
ActorID: "user-123",
CurrentVersion: 20,
AttemptedVersion: 21,
}
var versionErr *VersionConflictError
if !errors.As(originalErr, &versionErr) {
t.Fatal("expected errors.As to extract VersionConflictError")
}
if versionErr.ActorID != "user-123" {
t.Errorf("extracted ActorID mismatch: got %q, want %q", versionErr.ActorID, "user-123")
}
if versionErr.CurrentVersion != 20 {
t.Errorf("extracted CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 20)
}
if versionErr.AttemptedVersion != 21 {
t.Errorf("extracted AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 21)
}
}
func TestVersionConflictError_RetryLogicWithCurrentVersion(t *testing.T) {
// Demonstrates how applications extract CurrentVersion for retry logic
err := &VersionConflictError{
ActorID: "order-555",
CurrentVersion: 15,
AttemptedVersion: 16,
}
// Application can check for conflict and use CurrentVersion
if errors.Is(err, ErrVersionConflict) {
var versionErr *VersionConflictError
if errors.As(err, &versionErr) {
// Application uses CurrentVersion to determine next attempt
nextVersion := versionErr.CurrentVersion + 1
if nextVersion != 16 {
t.Errorf("retry version calculation failed: got %d, want %d", nextVersion, 16)
}
}
}
}
func TestVersionConflictError_SpecialCharactersInActorID(t *testing.T) {
specialChars := []string{
"actor-with-dashes",
"actor_with_underscores",
"actor.with.dots",
"actor@special",
"actor with spaces",
"actor:with:colons",
"actor/with/slashes",
}
for _, actorID := range specialChars {
t.Run(actorID, func(t *testing.T) {
err := &VersionConflictError{
ActorID: actorID,
CurrentVersion: 1,
AttemptedVersion: 2,
}
// Verify fields are preserved
if err.ActorID != actorID {
t.Errorf("ActorID not preserved: got %q, want %q", err.ActorID, actorID)
}
// Verify message includes the ActorID
msg := err.Error()
if !strings.Contains(msg, actorID) {
t.Errorf("ActorID not in error message for %q: %s", actorID, msg)
}
})
}
}

235
examples/README.md Normal file
View File

@@ -0,0 +1,235 @@
# Aether Examples
Standard patterns and best practices for building with Aether.
## Version Conflict Retry Patterns
When using optimistic concurrency control with Aether's event store, version conflicts can occur when multiple writers attempt to save events for the same actor. The `VersionConflictError` provides full context about the conflict, enabling intelligent retry strategies.
### Understanding Version Conflicts
A version conflict occurs when:
- You attempt to save an event with version `N`
- But the actor already has a version >= `N`
Example:
```go
// Actor "order-123" currently has version 5
// Writer A reads version 5, creates version 6, saves successfully
// Writer B also read version 5, creates version 6, attempts save
// -> VersionConflictError: current=6, attempted=6
```
### Working with VersionConflictError
The `VersionConflictError` provides:
- `ActorID` - The actor that had the conflict
- `CurrentVersion` - The actual current version in the store
- `AttemptedVersion` - The version you tried to save
Example usage:
```go
err := eventStore.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
fmt.Printf("Conflict for actor %q: current=%d, attempted=%d",
versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
// Implement retry logic using CurrentVersion
nextVersion := versionErr.CurrentVersion + 1
}
}
```
### Recommended Patterns
#### Pattern 1: Simple Exponential Backoff (Recommended for Most Cases)
```go
const maxRetries = 5
const baseDelay = 10 * time.Millisecond
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, _ := eventStore.GetLatestVersion(actorID)
event := &aether.Event{
ActorID: actorID,
Version: currentVersion + 1,
// ...
}
err := eventStore.SaveEvent(event)
if err == nil {
return nil // Success!
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err // Different error, don't retry
}
// Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms
delay := time.Duration(baseDelay.Milliseconds() * int64(math.Pow(2, float64(attempt)))) * time.Millisecond
time.Sleep(delay)
}
return fmt.Errorf("max retries exceeded")
```
**Pros:**
- Simple to understand and implement
- Respects store capacity
- Good for most scenarios
**Cons:**
- Can cause thundering herd in high-concurrency scenarios
- May not work well if conflicts are due to logical issues
#### Pattern 2: State Reload and Merge
Use this pattern when you can merge concurrent changes:
```go
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
// Reload current state
events, _ := eventStore.GetEvents(actorID, 0)
aggregate := rebuildFromEvents(events)
// Apply your update
aggregate.Status = "shipped"
// Attempt save with new version
event := &aether.Event{
ActorID: actorID,
Version: aggregate.Version + 1,
Data: map[string]interface{}{"status": aggregate.Status},
}
err := eventStore.SaveEvent(event)
if err == nil {
return nil // Success!
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
// Reload and retry (loop continues)
}
```
**Pros:**
- Deterministic - will eventually succeed
- Can merge concurrent updates
- Good for business logic that's idempotent
**Cons:**
- More expensive (replaying events each attempt)
- Only works if updates can be safely retried
#### Pattern 3: Circuit Breaker for Cascading Failures
Use when you want to avoid hammering a saturated store:
```go
type CircuitBreaker struct {
state string // "closed", "open", "half-open"
failures int
failureThreshold int
lastFailureTime time.Time
cooldownTime time.Duration
}
// ... implement circuit breaker logic ...
// Usage:
if !cb.canAttempt() {
return fmt.Errorf("circuit breaker open")
}
err := eventStore.SaveEvent(event)
if err == nil {
cb.recordSuccess()
} else if errors.Is(err, aether.ErrVersionConflict) {
cb.recordFailure()
if cb.failureCount >= cb.failureThreshold {
cb.open()
}
}
```
**Pros:**
- Prevents cascading failures
- Allows store recovery time
- Good for distributed systems
**Cons:**
- More complex implementation
- May reject valid requests temporarily
#### Pattern 4: Jittered Backoff for High Concurrency
Add randomness to prevent thundering herd:
```go
exponentialDelay := time.Duration(baseDelay.Milliseconds() * int64(math.Pow(2, float64(attempt)))) * time.Millisecond
jitter := time.Duration(rand.Int63n(int64(exponentialDelay)))
delay := exponentialDelay + jitter
time.Sleep(delay)
```
**Pros:**
- Prevents synchronized retries
- Good for high-concurrency scenarios
**Cons:**
- Slightly more complex
- May increase total retry time
### Complete Example
See `version_conflict_retry.go` for complete, runnable examples of all patterns.
### When to Use Each Pattern
| Pattern | Use When | Avoid When |
|---------|----------|-----------|
| Exponential Backoff | Default choice for most apps | Store is consistently overloaded |
| State Reload | Updates can be safely replayed | Event replay is expensive |
| Circuit Breaker | Store is frequently saturated | You need immediate feedback |
| Jittered Backoff | Many concurrent writers | Single-threaded app |
### Monitoring Version Conflicts
Log and monitor version conflicts to understand contention patterns:
```go
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.WithFields(log.Fields{
"actor_id": versionErr.ActorID,
"current_version": versionErr.CurrentVersion,
"attempted_version": versionErr.AttemptedVersion,
"version_gap": versionErr.AttemptedVersion - versionErr.CurrentVersion,
}).Warn("Version conflict")
// Alert if gap is too large (indicates stale read)
if versionErr.AttemptedVersion - versionErr.CurrentVersion > 5 {
metrics.versionConflictLargeGap.Inc()
}
}
```
### Best Practices
1. **Always check the error type** - Not all errors are version conflicts
2. **Use CurrentVersion for retries** - Don't hardcode retry logic
3. **Set reasonable retry limits** - Prevent infinite loops
4. **Monitor contention** - Track version conflicts to identify hotspots
5. **Consider your domain** - Some updates can be safely retried, others cannot
6. **Test concurrent scenarios** - Version conflicts are rare in single-threaded apps
### References
- [CLAUDE.md](../CLAUDE.md) - Architecture and event versioning semantics
- [Event Sourcing Patterns](../vision.md) - Domain-driven design approach

View File

@@ -0,0 +1,356 @@
package main
import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"time"
"git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store"
"github.com/google/uuid"
)
// Example 1: Simple Retry with Exponential Backoff
// This is the most common pattern for handling version conflicts.
func simpleRetryWithExponentialBackoff(eventStore aether.EventStore, actorID string) error {
const maxRetries = 5
const baseDelay = 10 * time.Millisecond
for attempt := 0; attempt < maxRetries; attempt++ {
// Get current version
currentVersion, err := eventStore.GetLatestVersion(actorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Create event with next version
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderUpdated",
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"status": "processing"},
Timestamp: time.Now(),
}
// Attempt to save
err = eventStore.SaveEvent(event)
if err == nil {
// Success!
return nil
}
// Check if it's a version conflict
if !errors.Is(err, aether.ErrVersionConflict) {
// Different error - don't retry
return err
}
// Version conflict - extract details for logging
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf("Attempt %d: Version conflict for actor %q: current=%d, attempted=%d",
attempt+1, versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
}
// Last attempt - return error
if attempt == maxRetries-1 {
return fmt.Errorf("failed to save event after %d attempts: %w", maxRetries, err)
}
// Calculate delay with exponential backoff: baseDelay * 2^attempt
delay := time.Duration(baseDelay.Milliseconds()*int64(math.Pow(2, float64(attempt)))) * time.Millisecond
log.Printf("Retrying in %v...", delay)
time.Sleep(delay)
}
return nil
}
// Example 2: Retry with Circuit Breaker Pattern
// Use a circuit breaker to avoid hammering the store during cascading failures.
type CircuitBreaker struct {
failureThreshold int
cooldownTime time.Duration
failures int
lastFailureTime time.Time
state string // "closed", "open", "half-open"
}
func (cb *CircuitBreaker) canAttempt() bool {
switch cb.state {
case "closed":
return true
case "open":
// Check if we should transition to half-open
if time.Since(cb.lastFailureTime) > cb.cooldownTime {
cb.state = "half-open"
return true
}
return false
case "half-open":
return true
}
return false
}
func (cb *CircuitBreaker) recordSuccess() {
cb.failures = 0
cb.state = "closed"
}
func (cb *CircuitBreaker) recordFailure() {
cb.failures++
cb.lastFailureTime = time.Now()
if cb.failures >= cb.failureThreshold {
cb.state = "open"
}
}
func retryWithCircuitBreaker(eventStore aether.EventStore, actorID string) error {
cb := &CircuitBreaker{
failureThreshold: 5,
cooldownTime: 1 * time.Second,
state: "closed",
}
const maxAttempts = 10
for attempt := 0; attempt < maxAttempts; attempt++ {
if !cb.canAttempt() {
return fmt.Errorf("circuit breaker is open for actor %q", actorID)
}
// Get current version
currentVersion, err := eventStore.GetLatestVersion(actorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Create event with next version
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderUpdated",
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"status": "processing"},
Timestamp: time.Now(),
}
// Attempt to save
err = eventStore.SaveEvent(event)
if err == nil {
cb.recordSuccess()
return nil
}
// Check if it's a version conflict
if !errors.Is(err, aether.ErrVersionConflict) {
// Different error - don't retry
return err
}
cb.recordFailure()
// Version conflict - extract details
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf("Circuit breaker (state=%s): Version conflict for actor %q",
cb.state, versionErr.ActorID)
}
if attempt < maxAttempts-1 {
time.Sleep(10 * time.Millisecond)
}
}
return fmt.Errorf("max retry attempts exceeded for actor %q", actorID)
}
// Example 3: Deterministic Retry - Always Succeed by Reloading Latest State
// Some applications can afford to reload state and merge changes.
type OrderAggregate struct {
ID string
Version int64
Status string
Amount float64
Comments []string
}
func retryWithStateReload(eventStore aether.EventStore, orderID string, updateFn func(*OrderAggregate) error) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
// Load current state by replaying events
events, err := eventStore.GetEvents(orderID, 0)
if err != nil {
return fmt.Errorf("failed to load events: %w", err)
}
// Rebuild aggregate from events
aggregate := &OrderAggregate{ID: orderID}
for _, e := range events {
aggregate.Version = e.Version
// Apply event to aggregate (simplified)
}
// Apply update
if err := updateFn(aggregate); err != nil {
return fmt.Errorf("update function failed: %w", err)
}
// Create event for the update
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderUpdated",
ActorID: orderID,
Version: aggregate.Version + 1,
Data: map[string]interface{}{"status": aggregate.Status},
Timestamp: time.Now(),
}
// Attempt to save
err = eventStore.SaveEvent(event)
if err == nil {
// Success!
log.Printf("Order %q updated with version %d", orderID, event.Version)
return nil
}
// Check if it's a version conflict
if !errors.Is(err, aether.ErrVersionConflict) {
// Different error - don't retry
return err
}
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf("Version conflict on attempt %d: current=%d, attempted=%d. Retrying...",
attempt+1, versionErr.CurrentVersion, versionErr.AttemptedVersion)
}
if attempt < maxRetries-1 {
// Brief delay before retry
time.Sleep(time.Duration(50*(attempt+1)) * time.Millisecond)
}
}
return fmt.Errorf("failed to update order %q after %d retries", orderID, maxRetries)
}
// Example 4: Jittered Retry
// Add randomness to prevent thundering herd problem.
func retryWithJitteredBackoff(eventStore aether.EventStore, actorID string) error {
const maxRetries = 5
const baseDelay = 10 * time.Millisecond
for attempt := 0; attempt < maxRetries; attempt++ {
// Get current version
currentVersion, err := eventStore.GetLatestVersion(actorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Create event with next version
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderUpdated",
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"status": "processing"},
Timestamp: time.Now(),
}
// Attempt to save
err = eventStore.SaveEvent(event)
if err == nil {
return nil
}
// Check if it's a version conflict
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
if attempt == maxRetries-1 {
return err
}
// Calculate delay with exponential backoff + jitter
exponentialDelay := time.Duration(baseDelay.Milliseconds()*int64(math.Pow(2, float64(attempt)))) * time.Millisecond
jitter := time.Duration(rand.Int63n(int64(exponentialDelay))) // Random jitter up to exponential delay
delay := exponentialDelay + jitter
log.Printf("Retrying in %v with jitter...", delay)
time.Sleep(delay)
}
return nil
}
// Example 5: Analyzing Version Conflict Context
// Extract all relevant information from a version conflict for debugging/monitoring.
func analyzeVersionConflict(err error) {
if !errors.Is(err, aether.ErrVersionConflict) {
log.Println("Not a version conflict error")
return
}
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf("Version Conflict Details:")
log.Printf(" Actor ID: %s", versionErr.ActorID)
log.Printf(" Current Version: %d", versionErr.CurrentVersion)
log.Printf(" Attempted Version: %d", versionErr.AttemptedVersion)
log.Printf(" Version Gap: %d", versionErr.AttemptedVersion-versionErr.CurrentVersion)
log.Printf(" Error Message: %s", versionErr.Error())
// Application-specific logic
if versionErr.CurrentVersion > versionErr.AttemptedVersion {
log.Printf("WARNING: Attempted version %d is behind current version %d",
versionErr.AttemptedVersion, versionErr.CurrentVersion)
}
}
}
// Example usage showing all patterns
func main() {
// Create in-memory event store for demonstration
eventStore := store.NewInMemoryEventStore()
actorID := "order-123"
fmt.Println("=== Version Conflict Retry Patterns ===")
// Demonstrate patterns (using simplified versions)
log.Println("Pattern 1: Simple Exponential Backoff")
if err := simpleRetryWithExponentialBackoff(eventStore, actorID); err != nil {
log.Printf("Error: %v\n", err)
}
log.Println("\nPattern 3: State Reload")
if err := retryWithStateReload(eventStore, actorID, func(agg *OrderAggregate) error {
agg.Status = "shipped"
return nil
}); err != nil {
log.Printf("Error: %v\n", err)
}
log.Println("\nPattern 4: Jittered Backoff")
if err := retryWithJitteredBackoff(eventStore, actorID); err != nil {
log.Printf("Error: %v\n", err)
}
fmt.Println("\n=== Recommended Pattern ===")
fmt.Println("Use exponential backoff (Pattern 1) for most cases:")
fmt.Println("- Simple to understand and implement")
fmt.Println("- Respects the store's capacity")
fmt.Println("- Avoids thundering herd (add jitter for high concurrency)")
fmt.Println("")
fmt.Println("Use state reload (Pattern 3) when:")
fmt.Println("- You can merge concurrent changes")
fmt.Println("- Deterministic success is required")
fmt.Println("- Replaying events is not expensive")
}